You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/12 20:03:30 UTC
[45/50] [abbrv] hadoop git commit: HDFS-10687. Federation Membership
State Store internal API. Contributed by Jason Kace and Inigo Goiri.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f04b695f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
new file mode 100644
index 0000000..2d74505
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.ROUTERS;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.newStateStore;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the basic {@link ActiveNamenodeResolver} functionality.
+ */
+public class TestNamenodeResolver {
+
+ private static StateStoreService stateStore;
+ private static ActiveNamenodeResolver namenodeResolver;
+
+ @BeforeClass
+ public static void create() throws Exception {
+
+ Configuration conf = getStateStoreConfiguration();
+
+ // Reduce expirations to 5 seconds
+ conf.setLong(
+ DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
+ TimeUnit.SECONDS.toMillis(5));
+
+ stateStore = newStateStore(conf);
+ assertNotNull(stateStore);
+
+ namenodeResolver = new MembershipNamenodeResolver(conf, stateStore);
+ namenodeResolver.setRouterId(ROUTERS[0]);
+ }
+
+ @AfterClass
+ public static void destroy() throws Exception {
+ stateStore.stop();
+ stateStore.close();
+ }
+
+ @Before
+ public void setup() throws IOException, InterruptedException {
+ // Wait for state store to connect
+ stateStore.loadDriver();
+ waitStateStore(stateStore, 10000);
+
+ // Clear NN registrations
+ boolean cleared = clearRecords(stateStore, MembershipState.class);
+ assertTrue(cleared);
+ }
+
+ @Test
+ public void testStateStoreDisconnected() throws Exception {
+
+ // Add an entry to the store
+ NamenodeStatusReport report = createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
+ assertTrue(namenodeResolver.registerNamenode(report));
+
+ // Close the data store driver
+ stateStore.closeDriver();
+ assertFalse(stateStore.isDriverReady());
+
+ // Flush the caches
+ stateStore.refreshCaches(true);
+
+ // Verify commands fail due to no cached data and no state store
+ // connectivity.
+ List<? extends FederationNamenodeContext> nns =
+ namenodeResolver.getNamenodesForBlockPoolId(NAMESERVICES[0]);
+ assertNull(nns);
+
+ verifyException(namenodeResolver, "registerNamenode",
+ StateStoreUnavailableException.class,
+ new Class[] {NamenodeStatusReport.class}, new Object[] {report});
+ }
+
+ /**
+ * Verify the first registration on the resolver.
+ *
+ * @param nsId Nameservice identifier.
+ * @param nnId Namenode identifier within the nemeservice.
+ * @param resultsCount Number of results expected.
+ * @param state Expected state for the first one.
+ * @throws IOException If we cannot get the namenodes.
+ */
+ private void verifyFirstRegistration(String nsId, String nnId,
+ int resultsCount, FederationNamenodeServiceState state)
+ throws IOException {
+ List<? extends FederationNamenodeContext> namenodes =
+ namenodeResolver.getNamenodesForNameserviceId(nsId);
+ if (resultsCount == 0) {
+ assertNull(namenodes);
+ } else {
+ assertEquals(resultsCount, namenodes.size());
+ if (namenodes.size() > 0) {
+ FederationNamenodeContext namenode = namenodes.get(0);
+ assertEquals(state, namenode.getState());
+ assertEquals(nnId, namenode.getNamenodeId());
+ }
+ }
+ }
+
+ @Test
+ public void testRegistrationExpired()
+ throws InterruptedException, IOException {
+
+ // Populate the state store with a single NN element
+ // 1) ns0:nn0 - Active
+ // Wait for the entry to expire without heartbeating
+ // Verify the NN entry is not accessible once expired.
+ NamenodeStatusReport report = createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
+ assertTrue(namenodeResolver.registerNamenode(report));
+
+ // Load cache
+ stateStore.refreshCaches(true);
+
+ // Verify
+ verifyFirstRegistration(
+ NAMESERVICES[0], NAMENODES[0], 1,
+ FederationNamenodeServiceState.ACTIVE);
+
+ // Wait past expiration (set in conf to 5 seconds)
+ Thread.sleep(6000);
+ // Reload cache
+ stateStore.refreshCaches(true);
+
+ // Verify entry is now expired and is no longer in the cache
+ verifyFirstRegistration(
+ NAMESERVICES[0], NAMENODES[0], 0,
+ FederationNamenodeServiceState.ACTIVE);
+
+ // Heartbeat again, updates dateModified
+ assertTrue(namenodeResolver.registerNamenode(report));
+ // Reload cache
+ stateStore.refreshCaches(true);
+
+ // Verify updated entry is marked active again and accessible to RPC server
+ verifyFirstRegistration(
+ NAMESERVICES[0], NAMENODES[0], 1,
+ FederationNamenodeServiceState.ACTIVE);
+ }
+
+ @Test
+ public void testRegistrationNamenodeSelection()
+ throws InterruptedException, IOException {
+
+ // 1) ns0:nn0 - Active
+ // 2) ns0:nn1 - Standby (newest)
+ // Verify the selected entry is the active entry
+ assertTrue(namenodeResolver.registerNamenode(
+ createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
+ Thread.sleep(100);
+ assertTrue(namenodeResolver.registerNamenode(
+ createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+
+ stateStore.refreshCaches(true);
+
+ verifyFirstRegistration(
+ NAMESERVICES[0], NAMENODES[0], 2,
+ FederationNamenodeServiceState.ACTIVE);
+
+ // 1) ns0:nn0 - Expired (stale)
+ // 2) ns0:nn1 - Standby (newest)
+ // Verify the selected entry is the standby entry as the active entry is
+ // stale
+ assertTrue(namenodeResolver.registerNamenode(
+ createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
+
+ // Expire active registration
+ Thread.sleep(6000);
+
+ // Refresh standby registration
+ assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+
+ // Verify that standby is selected (active is now expired)
+ stateStore.refreshCaches(true);
+ verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 1,
+ FederationNamenodeServiceState.STANDBY);
+
+ // 1) ns0:nn0 - Active
+ // 2) ns0:nn1 - Unavailable (newest)
+ // Verify the selected entry is the active entry
+ assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
+ Thread.sleep(100);
+ assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[1], null)));
+ stateStore.refreshCaches(true);
+ verifyFirstRegistration(NAMESERVICES[0], NAMENODES[0], 2,
+ FederationNamenodeServiceState.ACTIVE);
+
+ // 1) ns0:nn0 - Unavailable (newest)
+ // 2) ns0:nn1 - Standby
+ // Verify the selected entry is the standby entry
+ assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+ Thread.sleep(1000);
+ assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[0], null)));
+
+ stateStore.refreshCaches(true);
+ verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 2,
+ FederationNamenodeServiceState.STANDBY);
+
+ // 1) ns0:nn0 - Active (oldest)
+ // 2) ns0:nn1 - Standby
+ // 3) ns0:nn2 - Active (newest)
+ // Verify the selected entry is the newest active entry
+ assertTrue(namenodeResolver.registerNamenode(
+ createNamenodeReport(NAMESERVICES[0], NAMENODES[0], null)));
+ Thread.sleep(100);
+ assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+ Thread.sleep(100);
+ assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[2], HAServiceState.ACTIVE)));
+
+ stateStore.refreshCaches(true);
+ verifyFirstRegistration(NAMESERVICES[0], NAMENODES[2], 3,
+ FederationNamenodeServiceState.ACTIVE);
+
+ // 1) ns0:nn0 - Standby (oldest)
+ // 2) ns0:nn1 - Standby (newest)
+ // 3) ns0:nn2 - Standby
+ // Verify the selected entry is the newest standby entry
+ assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[0], HAServiceState.STANDBY)));
+ assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[2], HAServiceState.STANDBY)));
+ Thread.sleep(1500);
+ assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+
+ stateStore.refreshCaches(true);
+ verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 3,
+ FederationNamenodeServiceState.STANDBY);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f04b695f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
index fc5aebd..598b9cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
@@ -34,9 +34,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.util.Time;
/**
@@ -96,7 +99,7 @@ public final class FederationStateStoreTestUtils {
* @throws IOException If it cannot create the State Store.
* @throws InterruptedException If we cannot wait for the store to start.
*/
- public static StateStoreService getStateStore(
+ public static StateStoreService newStateStore(
Configuration configuration) throws IOException, InterruptedException {
StateStoreService stateStore = new StateStoreService();
@@ -205,6 +208,7 @@ public final class FederationStateStoreTestUtils {
if (!synchronizeRecords(store, emptyList, recordClass)) {
return false;
}
+ store.refreshCaches(true);
return true;
}
@@ -229,4 +233,21 @@ public final class FederationStateStoreTestUtils {
}
return false;
}
+
+ public static MembershipState createMockRegistrationForNamenode(
+ String nameserviceId, String namenodeId,
+ FederationNamenodeServiceState state) throws IOException {
+ MembershipState entry = MembershipState.newInstance(
+ "routerId", nameserviceId, namenodeId, "clusterId", "test",
+ "0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", state, false);
+ MembershipStats stats = MembershipStats.newInstance();
+ stats.setNumOfActiveDatanodes(100);
+ stats.setNumOfDeadDatanodes(10);
+ stats.setNumOfDecommissioningDatanodes(20);
+ stats.setNumOfDecomActiveDatanodes(15);
+ stats.setNumOfDecomDeadDatanodes(5);
+ stats.setNumOfBlocks(10);
+ entry.setStats(stats);
+ return entry;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f04b695f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java
new file mode 100644
index 0000000..7f6704e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.newStateStore;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+/**
+ * Test the basic {@link StateStoreService} {@link MountTableStore}
+ * functionality.
+ */
+public class TestStateStoreBase {
+
+ private static StateStoreService stateStore;
+ private static Configuration conf;
+
+ protected static StateStoreService getStateStore() {
+ return stateStore;
+ }
+
+ protected static Configuration getConf() {
+ return conf;
+ }
+
+ @BeforeClass
+ public static void createBase() throws IOException, InterruptedException {
+
+ conf = getStateStoreConfiguration();
+
+ // Disable auto-reconnect to data store
+ conf.setLong(DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
+ TimeUnit.HOURS.toMillis(1));
+ }
+
+ @AfterClass
+ public static void destroyBase() throws Exception {
+ if (stateStore != null) {
+ stateStore.stop();
+ stateStore.close();
+ stateStore = null;
+ }
+ }
+
+ @Before
+ public void setupBase() throws IOException, InterruptedException,
+ InstantiationException, IllegalAccessException {
+ if (stateStore == null) {
+ stateStore = newStateStore(conf);
+ assertNotNull(stateStore);
+ }
+ // Wait for state store to connect
+ stateStore.loadDriver();
+ waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f04b695f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java
new file mode 100644
index 0000000..26f081b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.ROUTERS;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.util.Time;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the basic {@link MembershipStore} membership functionality.
+ */
+public class TestStateStoreMembershipState extends TestStateStoreBase {
+
+ private static MembershipStore membershipStore;
+
+ @BeforeClass
+ public static void create() {
+ // Reduce expirations to 5 seconds
+ getConf().setLong(
+ DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
+ TimeUnit.SECONDS.toMillis(5));
+ }
+
+ @Before
+ public void setup() throws IOException, InterruptedException {
+
+ membershipStore =
+ getStateStore().getRegisteredRecordStore(MembershipStore.class);
+
+ // Clear NN registrations
+ assertTrue(clearRecords(getStateStore(), MembershipState.class));
+ }
+
+ @Test
+ public void testNamenodeStateOverride() throws Exception {
+ // Populate the state store
+ // 1) ns0:nn0 - Standby
+ String ns = "ns0";
+ String nn = "nn0";
+ MembershipState report = createRegistration(
+ ns, nn, ROUTERS[1], FederationNamenodeServiceState.STANDBY);
+ assertTrue(namenodeHeartbeat(report));
+
+ // Load data into cache and calculate quorum
+ assertTrue(getStateStore().loadCache(MembershipStore.class, true));
+
+ MembershipState existingState = getNamenodeRegistration(ns, nn);
+ assertEquals(
+ FederationNamenodeServiceState.STANDBY, existingState.getState());
+
+ // Override cache
+ UpdateNamenodeRegistrationRequest request =
+ UpdateNamenodeRegistrationRequest.newInstance(
+ ns, nn, FederationNamenodeServiceState.ACTIVE);
+ assertTrue(membershipStore.updateNamenodeRegistration(request).getResult());
+
+ MembershipState newState = getNamenodeRegistration(ns, nn);
+ assertEquals(FederationNamenodeServiceState.ACTIVE, newState.getState());
+ }
+
+ @Test
+ public void testStateStoreDisconnected() throws Exception {
+
+ // Close the data store driver
+ getStateStore().closeDriver();
+ assertFalse(getStateStore().isDriverReady());
+
+ NamenodeHeartbeatRequest hbRequest = NamenodeHeartbeatRequest.newInstance();
+ hbRequest.setNamenodeMembership(
+ createMockRegistrationForNamenode(
+ "test", "test", FederationNamenodeServiceState.UNAVAILABLE));
+ verifyException(membershipStore, "namenodeHeartbeat",
+ StateStoreUnavailableException.class,
+ new Class[] {NamenodeHeartbeatRequest.class},
+ new Object[] {hbRequest });
+
+ // Information from cache, no exception should be triggered for these
+ // TODO - should cached info expire at some point?
+ GetNamenodeRegistrationsRequest getRequest =
+ GetNamenodeRegistrationsRequest.newInstance();
+ verifyException(membershipStore,
+ "getNamenodeRegistrations", null,
+ new Class[] {GetNamenodeRegistrationsRequest.class},
+ new Object[] {getRequest});
+
+ verifyException(membershipStore,
+ "getExpiredNamenodeRegistrations", null,
+ new Class[] {GetNamenodeRegistrationsRequest.class},
+ new Object[] {getRequest});
+
+ UpdateNamenodeRegistrationRequest overrideRequest =
+ UpdateNamenodeRegistrationRequest.newInstance();
+ verifyException(membershipStore,
+ "updateNamenodeRegistration", null,
+ new Class[] {UpdateNamenodeRegistrationRequest.class},
+ new Object[] {overrideRequest});
+ }
+
+ private void registerAndLoadRegistrations(
+ List<MembershipState> registrationList) throws IOException {
+ // Populate
+ assertTrue(synchronizeRecords(
+ getStateStore(), registrationList, MembershipState.class));
+
+ // Load into cache
+ assertTrue(getStateStore().loadCache(MembershipStore.class, true));
+ }
+
+ private MembershipState createRegistration(String ns, String nn,
+ String router, FederationNamenodeServiceState state) throws IOException {
+ MembershipState record = MembershipState.newInstance(
+ router, ns,
+ nn, "testcluster", "testblock-" + ns, "testrpc-"+ ns + nn,
+ "testservice-"+ ns + nn, "testlifeline-"+ ns + nn,
+ "testweb-" + ns + nn, state, false);
+ return record;
+ }
+
+ @Test
+ public void testRegistrationMajorityQuorum()
+ throws InterruptedException, IOException {
+
+ // Populate the state store with a set of non-matching elements
+ // 1) ns0:nn0 - Standby (newest)
+ // 2) ns0:nn0 - Active (oldest)
+ // 3) ns0:nn0 - Active (2nd oldest)
+ // 4) ns0:nn0 - Active (3nd oldest element, newest active element)
+ // Verify the selected entry is the newest majority opinion (4)
+ String ns = "ns0";
+ String nn = "nn0";
+
+ // Active - oldest
+ MembershipState report = createRegistration(
+ ns, nn, ROUTERS[1], FederationNamenodeServiceState.ACTIVE);
+ assertTrue(namenodeHeartbeat(report));
+ Thread.sleep(1000);
+
+ // Active - 2nd oldest
+ report = createRegistration(
+ ns, nn, ROUTERS[2], FederationNamenodeServiceState.ACTIVE);
+ assertTrue(namenodeHeartbeat(report));
+ Thread.sleep(1000);
+
+ // Active - 3rd oldest, newest active element
+ report = createRegistration(
+ ns, nn, ROUTERS[3], FederationNamenodeServiceState.ACTIVE);
+ assertTrue(namenodeHeartbeat(report));
+
+ // standby - newest overall
+ report = createRegistration(
+ ns, nn, ROUTERS[0], FederationNamenodeServiceState.STANDBY);
+ assertTrue(namenodeHeartbeat(report));
+
+ // Load and calculate quorum
+ assertTrue(getStateStore().loadCache(MembershipStore.class, true));
+
+ // Verify quorum entry
+ MembershipState quorumEntry = getNamenodeRegistration(
+ report.getNameserviceId(), report.getNamenodeId());
+ assertNotNull(quorumEntry);
+ assertEquals(quorumEntry.getRouterId(), ROUTERS[3]);
+ }
+
+ @Test
+ public void testRegistrationQuorumExcludesExpired()
+ throws InterruptedException, IOException {
+
+ // Populate the state store with some expired entries and verify the expired
+ // entries are ignored.
+ // 1) ns0:nn0 - Active
+ // 2) ns0:nn0 - Expired
+ // 3) ns0:nn0 - Expired
+ // 4) ns0:nn0 - Expired
+ // Verify the selected entry is the active entry
+ List<MembershipState> registrationList = new ArrayList<>();
+ String ns = "ns0";
+ String nn = "nn0";
+ String rpcAddress = "testrpcaddress";
+ String serviceAddress = "testserviceaddress";
+ String lifelineAddress = "testlifelineaddress";
+ String blockPoolId = "testblockpool";
+ String clusterId = "testcluster";
+ String webAddress = "testwebaddress";
+ boolean safemode = false;
+
+ // Active
+ MembershipState record = MembershipState.newInstance(
+ ROUTERS[0], ns, nn, clusterId, blockPoolId,
+ rpcAddress, serviceAddress, lifelineAddress, webAddress,
+ FederationNamenodeServiceState.ACTIVE, safemode);
+ registrationList.add(record);
+
+ // Expired
+ record = MembershipState.newInstance(
+ ROUTERS[1], ns, nn, clusterId, blockPoolId,
+ rpcAddress, serviceAddress, lifelineAddress, webAddress,
+ FederationNamenodeServiceState.EXPIRED, safemode);
+ registrationList.add(record);
+
+ // Expired
+ record = MembershipState.newInstance(
+ ROUTERS[2], ns, nn, clusterId, blockPoolId,
+ rpcAddress, serviceAddress, lifelineAddress, webAddress,
+ FederationNamenodeServiceState.EXPIRED, safemode);
+ registrationList.add(record);
+
+ // Expired
+ record = MembershipState.newInstance(
+ ROUTERS[3], ns, nn, clusterId, blockPoolId,
+ rpcAddress, serviceAddress, lifelineAddress, webAddress,
+ FederationNamenodeServiceState.EXPIRED, safemode);
+ registrationList.add(record);
+ registerAndLoadRegistrations(registrationList);
+
+ // Verify quorum entry chooses active membership
+ MembershipState quorumEntry = getNamenodeRegistration(
+ record.getNameserviceId(), record.getNamenodeId());
+ assertNotNull(quorumEntry);
+ assertEquals(ROUTERS[0], quorumEntry.getRouterId());
+ }
+
+ @Test
+ public void testRegistrationQuorumAllExpired() throws IOException {
+
+ // 1) ns0:nn0 - Expired (oldest)
+ // 2) ns0:nn0 - Expired
+ // 3) ns0:nn0 - Expired
+ // 4) ns0:nn0 - Expired
+ // Verify no entry is either selected or cached
+ List<MembershipState> registrationList = new ArrayList<>();
+ String ns = NAMESERVICES[0];
+ String nn = NAMENODES[0];
+ String rpcAddress = "testrpcaddress";
+ String serviceAddress = "testserviceaddress";
+ String lifelineAddress = "testlifelineaddress";
+ String blockPoolId = "testblockpool";
+ String clusterId = "testcluster";
+ String webAddress = "testwebaddress";
+ boolean safemode = false;
+ long startingTime = Time.now();
+
+ // Expired
+ MembershipState record = MembershipState.newInstance(
+ ROUTERS[0], ns, nn, clusterId, blockPoolId,
+ rpcAddress, webAddress, lifelineAddress, webAddress,
+ FederationNamenodeServiceState.EXPIRED, safemode);
+ record.setDateModified(startingTime - 10000);
+ registrationList.add(record);
+
+ // Expired
+ record = MembershipState.newInstance(
+ ROUTERS[1], ns, nn, clusterId, blockPoolId,
+ rpcAddress, serviceAddress, lifelineAddress, webAddress,
+ FederationNamenodeServiceState.EXPIRED, safemode);
+ record.setDateModified(startingTime);
+ registrationList.add(record);
+
+ // Expired
+ record = MembershipState.newInstance(
+ ROUTERS[2], ns, nn, clusterId, blockPoolId,
+ rpcAddress, serviceAddress, lifelineAddress, webAddress,
+ FederationNamenodeServiceState.EXPIRED, safemode);
+ record.setDateModified(startingTime);
+ registrationList.add(record);
+
+ // Expired
+ record = MembershipState.newInstance(
+ ROUTERS[3], ns, nn, clusterId, blockPoolId,
+ rpcAddress, serviceAddress, lifelineAddress, webAddress,
+ FederationNamenodeServiceState.EXPIRED, safemode);
+ record.setDateModified(startingTime);
+ registrationList.add(record);
+
+ registerAndLoadRegistrations(registrationList);
+
+ // Verify no entry is found for this nameservice
+ assertNull(getNamenodeRegistration(
+ record.getNameserviceId(), record.getNamenodeId()));
+ }
+
+ @Test
+ public void testRegistrationNoQuorum()
+ throws InterruptedException, IOException {
+
+ // Populate the state store with a set of non-matching elements
+ // 1) ns0:nn0 - Standby (newest)
+ // 2) ns0:nn0 - Standby (oldest)
+ // 3) ns0:nn0 - Active (2nd oldest)
+ // 4) ns0:nn0 - Active (3nd oldest element, newest active element)
+ // Verify the selected entry is the newest entry (1)
+ MembershipState report1 = createRegistration(
+ NAMESERVICES[0], NAMENODES[0], ROUTERS[1],
+ FederationNamenodeServiceState.STANDBY);
+ assertTrue(namenodeHeartbeat(report1));
+ Thread.sleep(100);
+ MembershipState report2 = createRegistration(
+ NAMESERVICES[0], NAMENODES[0], ROUTERS[2],
+ FederationNamenodeServiceState.ACTIVE);
+ assertTrue(namenodeHeartbeat(report2));
+ Thread.sleep(100);
+ MembershipState report3 = createRegistration(
+ NAMESERVICES[0], NAMENODES[0], ROUTERS[3],
+ FederationNamenodeServiceState.ACTIVE);
+ assertTrue(namenodeHeartbeat(report3));
+ Thread.sleep(100);
+ MembershipState report4 = createRegistration(
+ NAMESERVICES[0], NAMENODES[0], ROUTERS[0],
+ FederationNamenodeServiceState.STANDBY);
+ assertTrue(namenodeHeartbeat(report4));
+
+ // Load and calculate quorum
+ assertTrue(getStateStore().loadCache(MembershipStore.class, true));
+
+ // Verify quorum entry uses the newest data, even though it is standby
+ MembershipState quorumEntry = getNamenodeRegistration(
+ report1.getNameserviceId(), report1.getNamenodeId());
+ assertNotNull(quorumEntry);
+ assertEquals(ROUTERS[0], quorumEntry.getRouterId());
+ assertEquals(
+ FederationNamenodeServiceState.STANDBY, quorumEntry.getState());
+ }
+
+ @Test
+ public void testRegistrationExpired()
+ throws InterruptedException, IOException {
+
+ // Populate the state store with a single NN element
+ // 1) ns0:nn0 - Active
+ // Wait for the entry to expire without heartbeating
+ // Verify the NN entry is populated as EXPIRED internally in the state store
+
+ MembershipState report = createRegistration(
+ NAMESERVICES[0], NAMENODES[0], ROUTERS[0],
+ FederationNamenodeServiceState.ACTIVE);
+ assertTrue(namenodeHeartbeat(report));
+
+ // Load cache
+ assertTrue(getStateStore().loadCache(MembershipStore.class, true));
+
+ // Verify quorum and entry
+ MembershipState quorumEntry = getNamenodeRegistration(
+ report.getNameserviceId(), report.getNamenodeId());
+ assertNotNull(quorumEntry);
+ assertEquals(ROUTERS[0], quorumEntry.getRouterId());
+ assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState());
+
+ // Wait past expiration (set in conf to 5 seconds)
+ Thread.sleep(6000);
+ // Reload cache
+ assertTrue(getStateStore().loadCache(MembershipStore.class, true));
+
+ // Verify entry is now expired and is no longer in the cache
+ quorumEntry = getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]);
+ assertNull(quorumEntry);
+
+ // Verify entry is now expired and can't be used by RPC service
+ quorumEntry = getNamenodeRegistration(
+ report.getNameserviceId(), report.getNamenodeId());
+ assertNull(quorumEntry);
+
+ // Heartbeat again, updates dateModified
+ assertTrue(namenodeHeartbeat(report));
+ // Reload cache
+ assertTrue(getStateStore().loadCache(MembershipStore.class, true));
+
+ // Verify updated entry marked as active and is accessible to RPC server
+ quorumEntry = getNamenodeRegistration(
+ report.getNameserviceId(), report.getNamenodeId());
+ assertNotNull(quorumEntry);
+ assertEquals(ROUTERS[0], quorumEntry.getRouterId());
+ assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState());
+ }
+
+ /**
+ * Get a single namenode membership record from the store.
+ *
+ * @param nsId The HDFS nameservice ID to search for
+ * @param nnId The HDFS namenode ID to search for
+ * @return The single NamenodeMembershipRecord that matches the query or null
+ * if not found.
+ * @throws IOException if the query could not be executed.
+ */
+ private MembershipState getNamenodeRegistration(
+ final String nsId, final String nnId) throws IOException {
+
+ MembershipState partial = MembershipState.newInstance();
+ partial.setNameserviceId(nsId);
+ partial.setNamenodeId(nnId);
+ GetNamenodeRegistrationsRequest request =
+ GetNamenodeRegistrationsRequest.newInstance(partial);
+ GetNamenodeRegistrationsResponse response =
+ membershipStore.getNamenodeRegistrations(request);
+
+ List<MembershipState> results = response.getNamenodeMemberships();
+ if (results != null && results.size() == 1) {
+ MembershipState record = results.get(0);
+ return record;
+ }
+ return null;
+ }
+
+ /**
+ * Register a namenode heartbeat with the state store.
+ *
+ * @param store FederationMembershipStateStore instance to retrieve the
+ * membership data records.
+ * @param namenode A fully populated namenode membership record to be
+ * committed to the data store.
+ * @return True if successful, false otherwise.
+ * @throws IOException if the state store query could not be performed.
+ */
+ private boolean namenodeHeartbeat(MembershipState namenode)
+ throws IOException {
+
+ NamenodeHeartbeatRequest request =
+ NamenodeHeartbeatRequest.newInstance(namenode);
+ NamenodeHeartbeatResponse response =
+ membershipStore.namenodeHeartbeat(request);
+ return response.getResult();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f04b695f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
index 7f0b36a..dc51ee9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
@@ -31,11 +31,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.junit.AfterClass;
@@ -54,6 +57,8 @@ public class TestStateStoreDriverBase {
private static StateStoreService stateStore;
private static Configuration conf;
+ private static final Random RANDOM = new Random();
+
/**
* Get the State Store driver.
@@ -78,29 +83,47 @@ public class TestStateStoreDriverBase {
*/
public static void getStateStore(Configuration config) throws Exception {
conf = config;
- stateStore = FederationStateStoreTestUtils.getStateStore(conf);
+ stateStore = FederationStateStoreTestUtils.newStateStore(conf);
+ }
+
+ private String generateRandomString() {
+ String randomString = "/randomString-" + RANDOM.nextInt();
+ return randomString;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private <T extends Enum> T generateRandomEnum(Class<T> enumClass) {
+ int x = RANDOM.nextInt(enumClass.getEnumConstants().length);
+ T data = enumClass.getEnumConstants()[x];
+ return data;
}
+ @SuppressWarnings("unchecked")
private <T extends BaseRecord> T generateFakeRecord(Class<T> recordClass)
throws IllegalArgumentException, IllegalAccessException, IOException {
- // TODO add record
+ if (recordClass == MembershipState.class) {
+ return (T) MembershipState.newInstance(generateRandomString(),
+ generateRandomString(), generateRandomString(),
+ generateRandomString(), generateRandomString(),
+ generateRandomString(), generateRandomString(),
+ generateRandomString(), generateRandomString(),
+ generateRandomEnum(FederationNamenodeServiceState.class), false);
+ }
+
return null;
}
/**
* Validate if a record is the same.
*
- * @param original
- * @param committed
+ * @param original Original record.
+ * @param committed Committed record.
* @param assertEquals Assert if the records are equal or just return.
- * @return
- * @throws IllegalArgumentException
- * @throws IllegalAccessException
+ * @return If the record is successfully validated.
*/
private boolean validateRecord(
- BaseRecord original, BaseRecord committed, boolean assertEquals)
- throws IllegalArgumentException, IllegalAccessException {
+ BaseRecord original, BaseRecord committed, boolean assertEquals) {
boolean ret = true;
@@ -131,7 +154,7 @@ public class TestStateStoreDriverBase {
}
public static void removeAll(StateStoreDriver driver) throws IOException {
- // TODO add records to remove
+ driver.removeAll(MembershipState.class);
}
public <T extends BaseRecord> void testInsert(
@@ -139,17 +162,20 @@ public class TestStateStoreDriverBase {
throws IllegalArgumentException, IllegalAccessException, IOException {
assertTrue(driver.removeAll(recordClass));
- QueryResult<T> records = driver.get(recordClass);
- assertTrue(records.getRecords().isEmpty());
+ QueryResult<T> queryResult0 = driver.get(recordClass);
+ List<T> records0 = queryResult0.getRecords();
+ assertTrue(records0.isEmpty());
// Insert single
BaseRecord record = generateFakeRecord(recordClass);
driver.put(record, true, false);
// Verify
- records = driver.get(recordClass);
- assertEquals(1, records.getRecords().size());
- validateRecord(record, records.getRecords().get(0), true);
+ QueryResult<T> queryResult1 = driver.get(recordClass);
+ List<T> records1 = queryResult1.getRecords();
+ assertEquals(1, records1.size());
+ T record0 = records1.get(0);
+ validateRecord(record, record0, true);
// Insert multiple
List<T> insertList = new ArrayList<>();
@@ -160,8 +186,9 @@ public class TestStateStoreDriverBase {
driver.putAll(insertList, true, false);
// Verify
- records = driver.get(recordClass);
- assertEquals(11, records.getRecords().size());
+ QueryResult<T> queryResult2 = driver.get(recordClass);
+ List<T> records2 = queryResult2.getRecords();
+ assertEquals(11, records2.size());
}
public <T extends BaseRecord> void testFetchErrors(StateStoreDriver driver,
@@ -319,23 +346,23 @@ public class TestStateStoreDriverBase {
public void testInsert(StateStoreDriver driver)
throws IllegalArgumentException, IllegalAccessException, IOException {
- // TODO add records
+ testInsert(driver, MembershipState.class);
}
public void testPut(StateStoreDriver driver)
throws IllegalArgumentException, ReflectiveOperationException,
IOException, SecurityException {
- // TODO add records
+ testPut(driver, MembershipState.class);
}
public void testRemove(StateStoreDriver driver)
throws IllegalArgumentException, IllegalAccessException, IOException {
- // TODO add records
+ testRemove(driver, MembershipState.class);
}
public void testFetchErrors(StateStoreDriver driver)
throws IllegalArgumentException, IllegalAccessException, IOException {
- // TODO add records
+ testFetchErrors(driver, MembershipState.class);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f04b695f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java
new file mode 100644
index 0000000..d922414
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.records;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.junit.Test;
+
+/**
+ * Test the Membership State records.
+ */
+public class TestMembershipState {
+
+ private static final String ROUTER = "router";
+ private static final String NAMESERVICE = "nameservice";
+ private static final String NAMENODE = "namenode";
+ private static final String CLUSTER_ID = "cluster";
+ private static final String BLOCKPOOL_ID = "blockpool";
+ private static final String RPC_ADDRESS = "rpcaddress";
+ private static final String SERVICE_ADDRESS = "serviceaddress";
+ private static final String LIFELINE_ADDRESS = "lifelineaddress";
+ private static final String WEB_ADDRESS = "webaddress";
+ private static final boolean SAFE_MODE = false;
+
+ private static final long DATE_CREATED = 100;
+ private static final long DATE_MODIFIED = 200;
+
+ private static final long NUM_BLOCKS = 300;
+ private static final long NUM_FILES = 400;
+ private static final int NUM_DEAD = 500;
+ private static final int NUM_ACTIVE = 600;
+ private static final int NUM_DECOM = 700;
+ private static final int NUM_DECOM_ACTIVE = 800;
+ private static final int NUM_DECOM_DEAD = 900;
+ private static final long NUM_BLOCK_MISSING = 1000;
+
+ private static final long TOTAL_SPACE = 1100;
+ private static final long AVAILABLE_SPACE = 1200;
+
+ private static final FederationNamenodeServiceState STATE =
+ FederationNamenodeServiceState.ACTIVE;
+
+ private MembershipState createRecord() throws IOException {
+
+ MembershipState record = MembershipState.newInstance(
+ ROUTER, NAMESERVICE, NAMENODE, CLUSTER_ID,
+ BLOCKPOOL_ID, RPC_ADDRESS, SERVICE_ADDRESS, LIFELINE_ADDRESS,
+ WEB_ADDRESS, STATE, SAFE_MODE);
+ record.setDateCreated(DATE_CREATED);
+ record.setDateModified(DATE_MODIFIED);
+
+ MembershipStats stats = MembershipStats.newInstance();
+ stats.setNumOfBlocks(NUM_BLOCKS);
+ stats.setNumOfFiles(NUM_FILES);
+ stats.setNumOfActiveDatanodes(NUM_ACTIVE);
+ stats.setNumOfDeadDatanodes(NUM_DEAD);
+ stats.setNumOfDecommissioningDatanodes(NUM_DECOM);
+ stats.setNumOfDecomActiveDatanodes(NUM_DECOM_ACTIVE);
+ stats.setNumOfDecomDeadDatanodes(NUM_DECOM_DEAD);
+ stats.setNumOfBlocksMissing(NUM_BLOCK_MISSING);
+ stats.setTotalSpace(TOTAL_SPACE);
+ stats.setAvailableSpace(AVAILABLE_SPACE);
+ record.setStats(stats);
+ return record;
+ }
+
+ private void validateRecord(MembershipState record) throws IOException {
+
+ assertEquals(ROUTER, record.getRouterId());
+ assertEquals(NAMESERVICE, record.getNameserviceId());
+ assertEquals(CLUSTER_ID, record.getClusterId());
+ assertEquals(BLOCKPOOL_ID, record.getBlockPoolId());
+ assertEquals(RPC_ADDRESS, record.getRpcAddress());
+ assertEquals(WEB_ADDRESS, record.getWebAddress());
+ assertEquals(STATE, record.getState());
+ assertEquals(SAFE_MODE, record.getIsSafeMode());
+ assertEquals(DATE_CREATED, record.getDateCreated());
+ assertEquals(DATE_MODIFIED, record.getDateModified());
+
+ MembershipStats stats = record.getStats();
+ assertEquals(NUM_BLOCKS, stats.getNumOfBlocks());
+ assertEquals(NUM_FILES, stats.getNumOfFiles());
+ assertEquals(NUM_ACTIVE, stats.getNumOfActiveDatanodes());
+ assertEquals(NUM_DEAD, stats.getNumOfDeadDatanodes());
+ assertEquals(NUM_DECOM, stats.getNumOfDecommissioningDatanodes());
+ assertEquals(NUM_DECOM_ACTIVE, stats.getNumOfDecomActiveDatanodes());
+ assertEquals(NUM_DECOM_DEAD, stats.getNumOfDecomDeadDatanodes());
+ assertEquals(TOTAL_SPACE, stats.getTotalSpace());
+ assertEquals(AVAILABLE_SPACE, stats.getAvailableSpace());
+ }
+
+ @Test
+ public void testGetterSetter() throws IOException {
+ MembershipState record = createRecord();
+ validateRecord(record);
+ }
+
+ @Test
+ public void testSerialization() throws IOException {
+
+ MembershipState record = createRecord();
+
+ StateStoreSerializer serializer = StateStoreSerializer.getSerializer();
+ String serializedString = serializer.serializeString(record);
+ MembershipState newRecord =
+ serializer.deserialize(serializedString, MembershipState.class);
+
+ validateRecord(newRecord);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org