You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/09/18 02:06:26 UTC
[50/50] [abbrv] hadoop git commit: HDFS-13778. [SBN read]
TestStateAlignmentContextWithHA should use real ObserverReadProxyProvider
instead of AlignmentContextProxyProvider. Contributed by Konstantin Shvachko
and Plamen Jeliazkov.
HDFS-13778. [SBN read] TestStateAlignmentContextWithHA should use real ObserverReadProxyProvider instead of AlignmentContextProxyProvider. Contributed by Konstantin Shvachko and Plamen Jeliazkov.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c377e3ca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c377e3ca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c377e3ca
Branch: refs/heads/HDFS-12943
Commit: c377e3ca40b92f30e6223a0a4a1014b68ec59442
Parents: 30d1cd9
Author: Konstantin V Shvachko <sh...@apache.org>
Authored: Mon Sep 17 18:25:27 2018 -0700
Committer: Konstantin V Shvachko <sh...@apache.org>
Committed: Mon Sep 17 19:00:20 2018 -0700
----------------------------------------------------------------------
.../hdfs/TestStateAlignmentContextWithHA.java | 186 ++++++-------------
1 file changed, 57 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c377e3ca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
index 1acbd75..a494252 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
@@ -18,28 +18,24 @@
package org.apache.hadoop.hdfs;
-import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
-import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
-import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
@@ -61,55 +57,31 @@ import java.util.concurrent.TimeUnit;
* to the most recent alignment state of the server.
*/
public class TestStateAlignmentContextWithHA {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestStateAlignmentContextWithHA.class.getName());
private static final int NUMDATANODES = 1;
private static final int NUMCLIENTS = 10;
- private static final int NUMFILES = 300;
+ private static final int NUMFILES = 120;
private static final Configuration CONF = new HdfsConfiguration();
- private static final String NAMESERVICE = "nameservice";
private static final List<ClientGSIContext> AC_LIST = new ArrayList<>();
private static MiniDFSCluster cluster;
private static List<Worker> clients;
- private static ClientGSIContext spy;
private DistributedFileSystem dfs;
private int active = 0;
private int standby = 1;
- static class AlignmentContextProxyProvider<T>
- extends ConfiguredFailoverProxyProvider<T> {
+ static class ORPPwithAlignmentContexts<T extends ClientProtocol>
+ extends ObserverReadProxyProvider<T> {
- private ClientGSIContext alignmentContext;
-
- public AlignmentContextProxyProvider(
+ public ORPPwithAlignmentContexts(
Configuration conf, URI uri, Class<T> xface,
HAProxyFactory<T> factory) throws IOException {
super(conf, uri, xface, factory);
- // Create and set AlignmentContext in HAProxyFactory.
- // All proxies by factory will now have AlignmentContext assigned.
- this.alignmentContext = (spy != null ? spy : new ClientGSIContext());
- ((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
-
- AC_LIST.add(alignmentContext);
- }
- }
-
- static class SpyConfiguredContextProxyProvider<T>
- extends ConfiguredFailoverProxyProvider<T> {
-
- private ClientGSIContext alignmentContext;
-
- public SpyConfiguredContextProxyProvider(
- Configuration conf, URI uri, Class<T> xface,
- HAProxyFactory<T> factory) throws IOException {
- super(conf, uri, xface, factory);
-
- // Create but DON'T set in HAProxyFactory.
- this.alignmentContext = (spy != null ? spy : new ClientGSIContext());
-
- AC_LIST.add(alignmentContext);
+ AC_LIST.add((ClientGSIContext) getAlignmentContext());
}
}
@@ -121,23 +93,21 @@ public class TestStateAlignmentContextWithHA {
CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
- MiniDFSNNTopology.NSConf nsConf = new MiniDFSNNTopology.NSConf(NAMESERVICE);
- nsConf.addNN(new MiniDFSNNTopology.NNConf("nn1"));
- nsConf.addNN(new MiniDFSNNTopology.NNConf("nn2"));
-
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
- .nnTopology(MiniDFSNNTopology.simpleHATopology().addNameservice(nsConf))
+ .nnTopology(MiniDFSNNTopology.simpleHATopology(3))
.build();
cluster.waitActive();
cluster.transitionToActive(0);
+ cluster.transitionToObserver(2);
+
+ String nameservice = HATestUtil.getLogicalHostname(cluster);
+ HATestUtil.setFailoverConfigurations(cluster, CONF, nameservice, 0);
+ CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
+ "." + nameservice, ORPPwithAlignmentContexts.class.getName());
}
@Before
public void before() throws IOException, URISyntaxException {
- killWorkers();
- HATestUtil.setFailoverConfigurations(cluster, CONF, NAMESERVICE, 0);
- CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
- "." + NAMESERVICE, AlignmentContextProxyProvider.class.getName());
dfs = (DistributedFileSystem) FileSystem.get(CONF);
}
@@ -151,6 +121,7 @@ public class TestStateAlignmentContextWithHA {
@After
public void after() throws IOException {
+ killWorkers();
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
active = 0;
@@ -160,26 +131,6 @@ public class TestStateAlignmentContextWithHA {
dfs = null;
}
AC_LIST.clear();
- spy = null;
- }
-
- /**
- * This test checks if after a client writes we can see the state id in
- * updated via the response.
- */
- @Test
- public void testNoStateOnConfiguredProxyProvider() throws Exception {
- Configuration confCopy = new Configuration(CONF);
- confCopy.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
- "." + NAMESERVICE, SpyConfiguredContextProxyProvider.class.getName());
-
- try (DistributedFileSystem clearDfs =
- (DistributedFileSystem) FileSystem.get(confCopy)) {
- ClientGSIContext clientState = getContext(1);
- assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
- DFSTestUtil.writeFile(clearDfs, new Path("/testFileNoState"), "no_state");
- assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
- }
}
/**
@@ -234,48 +185,6 @@ public class TestStateAlignmentContextWithHA {
}
/**
- * This test mocks an AlignmentContext and ensures that DFSClient
- * writes its lastSeenStateId into RPC requests.
- */
- @Test
- public void testClientSendsState() throws Exception {
- ClientGSIContext alignmentContext = new ClientGSIContext();
- ClientGSIContext spiedAlignContext = Mockito.spy(alignmentContext);
- spy = spiedAlignContext;
-
- try (DistributedFileSystem clearDfs =
- (DistributedFileSystem) FileSystem.get(CONF)) {
-
- // Collect RpcRequestHeaders for verification later.
- final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> headers =
- new ArrayList<>();
- Mockito.doAnswer(a -> {
- Object[] arguments = a.getArguments();
- RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
- (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
- headers.add(header);
- return a.callRealMethod();
- }).when(spiedAlignContext).updateRequestState(Mockito.any());
-
- DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
-
- // Ensure first header and last header have different state.
- assertThat(headers.size() > 1, is(true));
- assertThat(headers.get(0).getStateId(),
- is(not(headers.get(headers.size() - 1))));
-
- // Ensure collected RpcRequestHeaders are in increasing order.
- long lastHeader = headers.get(0).getStateId();
- for (RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
- headers.subList(1, headers.size())) {
- long currentHeader = header.getStateId();
- assertThat(currentHeader >= lastHeader, is(true));
- lastHeader = header.getStateId();
- }
- }
- }
-
- /**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@@ -310,14 +219,22 @@ public class TestStateAlignmentContextWithHA {
@Test(timeout=300000)
public void testMultiClientStatesWithRandomFailovers() throws Exception {
- // We want threads to run during failovers; assuming at minimum 4 cores,
- // would like to see 2 clients competing against 2 NameNodes.
+ // First run, half the load, with one failover.
+ runClientsWithFailover(1, NUMCLIENTS/2, NUMFILES/2);
+ // Second half, with fail back.
+ runClientsWithFailover(NUMCLIENTS/2 + 1, NUMCLIENTS, NUMFILES/2);
+ }
+
+ private void runClientsWithFailover(int clientStartId,
+ int numClients,
+ int numFiles)
+ throws Exception {
ExecutorService execService = Executors.newFixedThreadPool(2);
- clients = new ArrayList<>(NUMCLIENTS);
- for (int i = 1; i <= NUMCLIENTS; i++) {
+ clients = new ArrayList<>(numClients);
+ for (int i = clientStartId; i <= numClients; i++) {
DistributedFileSystem haClient =
(DistributedFileSystem) FileSystem.get(CONF);
- clients.add(new Worker(haClient, NUMFILES, "/testFile3FO_", i));
+ clients.add(new Worker(haClient, numFiles, "/testFile3FO_", i));
}
// Execute workers in threadpool with random failovers.
@@ -325,15 +242,18 @@ public class TestStateAlignmentContextWithHA {
execService.shutdown();
boolean finished = false;
+ failOver();
+
while (!finished) {
- failOver();
- finished = execService.awaitTermination(1L, TimeUnit.SECONDS);
+ finished = execService.awaitTermination(20L, TimeUnit.SECONDS);
}
// Validation.
for (Future<STATE> future : futures) {
assertThat(future.get(), is(STATE.SUCCESS));
}
+
+ clients.clear();
}
private ClientGSIContext getContext(int clientCreationIndex) {
@@ -341,7 +261,9 @@ public class TestStateAlignmentContextWithHA {
}
private void failOver() throws IOException {
+ LOG.info("Transitioning Active to Standby");
cluster.transitionToStandby(active);
+ LOG.info("Transitioning Standby to Active");
cluster.transitionToActive(standby);
int tempActive = active;
active = standby;
@@ -388,30 +310,36 @@ public class TestStateAlignmentContextWithHA {
@Override
public STATE call() {
+ int i = -1;
try {
- for (int i = 0; i < filesToMake; i++) {
- long preClientStateFO =
- getContext(nonce).getLastSeenStateId();
+ for (i = 0; i < filesToMake; i++) {
+ ClientGSIContext gsiContext = getContext(nonce);
+ long preClientStateFO = gsiContext.getLastSeenStateId();
// Write using HA client.
- Path path = new Path(filePath + nonce + i);
+ Path path = new Path(filePath + nonce + "_" + i);
DFSTestUtil.writeFile(client, path, "erk");
- long postClientStateFO =
- getContext(nonce).getLastSeenStateId();
+ long postClientStateFO = gsiContext.getLastSeenStateId();
// Write(s) should have increased state. Check for greater than.
- if (postClientStateFO <= preClientStateFO) {
- System.out.println("FAIL: Worker started with: " +
- preClientStateFO + ", but finished with: " + postClientStateFO);
+ if (postClientStateFO < 0 || postClientStateFO <= preClientStateFO) {
+ LOG.error("FAIL: Worker started with: {} , but finished with: {}",
+ preClientStateFO, postClientStateFO);
return STATE.FAIL;
}
+
+ if(i % (NUMFILES/10) == 0) {
+ LOG.info("Worker {} created {} files", nonce, i);
+ LOG.info("LastSeenStateId = {}", postClientStateFO);
+ }
}
- client.close();
return STATE.SUCCESS;
- } catch (IOException e) {
- System.out.println("ERROR: Worker failed with: " + e);
+ } catch (Exception e) {
+ LOG.error("ERROR: Worker failed with: ", e);
return STATE.ERROR;
+ } finally {
+ LOG.info("Worker {} created {} files", nonce, i);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org