You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2015/02/06 16:48:42 UTC
[1/2] hbase git commit: HBASE-11568. Async WAL replication for region
replicas (Enis Soztutar)
Repository: hbase
Updated Branches:
refs/heads/branch-1 fb867f795 -> 055f5a95e
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
new file mode 100644
index 0000000..76945d7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
+ * async wal replication replays the edits to the secondary region in various scenarios.
+ */
+@Category(MediumTests.class)
+public class TestRegionReplicaReplicationEndpoint {
+
+ private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
+
+ static {
+ ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ private static final int NB_SERVERS = 2;
+
+ private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ Configuration conf = HTU.getConfiguration();
+ conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
+ conf.setInt("replication.source.size.capacity", 10240);
+ conf.setLong("replication.source.sleepforretries", 100);
+ conf.setInt("hbase.regionserver.maxlogs", 10);
+ conf.setLong("hbase.master.logcleaner.ttl", 10);
+ conf.setInt("zookeeper.recovery.retry", 1);
+ conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+ conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+ conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+ conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ conf.setInt("replication.stats.thread.period.seconds", 5);
+ conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); // less number of retries is needed
+ conf.setInt("hbase.client.serverside.retries.multiplier", 1);
+
+ HTU.startMiniCluster(NB_SERVERS);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ HTU.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
+ // create a table with region replicas. Check whether the replication peer is created
+ // and replication started.
+ ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
+ String peerId = "region_replica_replication";
+
+ if (admin.getPeerConfig(peerId) != null) {
+ admin.removePeer(peerId);
+ }
+
+ HTableDescriptor htd = HTU.createTableDescriptor(
+ "testReplicationPeerIsCreated_no_region_replicas");
+ HTU.getHBaseAdmin().createTable(htd);
+ ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId);
+ assertNull(peerConfig);
+
+ htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
+ htd.setRegionReplication(2);
+ HTU.getHBaseAdmin().createTable(htd);
+
+ // assert peer configuration is correct
+ peerConfig = admin.getPeerConfig(peerId);
+ assertNotNull(peerConfig);
+ assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration()));
+ assertEquals(peerConfig.getReplicationEndpointImpl(),
+ RegionReplicaReplicationEndpoint.class.getName());
+ admin.close();
+ }
+
+
+ public void testRegionReplicaReplication(int regionReplication) throws Exception {
+ // test region replica replication. Create a table with single region, write some data
+ // ensure that data is replicated to the secondary region
+ TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+ + regionReplication);
+ HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
+ htd.setRegionReplication(regionReplication);
+ HTU.getHBaseAdmin().createTable(htd);
+ TableName tableNameNoReplicas =
+ TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
+ HTU.deleteTableIfAny(tableNameNoReplicas);
+ HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1);
+
+ HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration());
+ HTableInterface table = connection.getTable(tableName);
+ HTableInterface tableNoReplicas = connection.getTable(tableNameNoReplicas);
+
+ try {
+ // load some data to the non-replicated table
+ HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000);
+
+ // load the data to the table
+ HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
+
+ verifyReplication(tableName, regionReplication, 0, 1000);
+
+ } finally {
+ table.close();
+ tableNoReplicas.close();
+ HTU.deleteTableIfAny(tableNameNoReplicas);
+ connection.close();
+ }
+ }
+
+ private void verifyReplication(TableName tableName, int regionReplication,
+ final int startRow, final int endRow) throws Exception {
+ // find the regions
+ final HRegion[] regions = new HRegion[regionReplication];
+
+ for (int i=0; i < NB_SERVERS; i++) {
+ HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+ List<HRegion> onlineRegions = rs.getOnlineRegions(tableName);
+ for (HRegion region : onlineRegions) {
+ regions[region.getRegionInfo().getReplicaId()] = region;
+ }
+ }
+
+ for (HRegion region : regions) {
+ assertNotNull(region);
+ }
+
+ for (int i = 1; i < regionReplication; i++) {
+ final HRegion region = regions[i];
+ // wait until all the data is replicated to all secondary regions
+ Waiter.waitFor(HTU.getConfiguration(), 60000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ LOG.info("verifying replication for region replica:" + region.getRegionInfo());
+ try {
+ HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow);
+ } catch(Throwable ex) {
+ LOG.warn("Verification from secondary region is not complete yet. Got:" + ex
+ + " " + ex.getMessage());
+ // still wait
+ return false;
+ }
+ return true;
+ }
+ });
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testRegionReplicaReplicationWith2Replicas() throws Exception {
+ testRegionReplicaReplication(2);
+ }
+
+ @Test(timeout = 60000)
+ public void testRegionReplicaReplicationWith3Replicas() throws Exception {
+ testRegionReplicaReplication(3);
+ }
+
+ @Test(timeout = 60000)
+ public void testRegionReplicaReplicationWith10Replicas() throws Exception {
+ testRegionReplicaReplication(10);
+ }
+
+ @Test (timeout = 60000)
+ public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
+ // Tests a table with region replication 3. Writes some data, and causes flushes and
+ // compactions. Verifies that the data is readable from the replicas. Note that this
+ // does not test whether the replicas actually pick up flushed files and apply compaction
+ // to their stores
+ int regionReplication = 3;
+ TableName tableName = TableName.valueOf("testRegionReplicaReplicationForFlushAndCompaction");
+ HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
+ htd.setRegionReplication(regionReplication);
+ HTU.getHBaseAdmin().createTable(htd);
+
+
+ HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration());
+ HTableInterface table = connection.getTable(tableName);
+
+ try {
+ // load the data to the table
+
+ for (int i = 0; i < 6000; i += 1000) {
+ LOG.info("Writing data from " + i + " to " + (i+1000));
+ HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000);
+ LOG.info("flushing table");
+ HTU.flush(tableName);
+ LOG.info("compacting table");
+ HTU.compact(tableName, false);
+ }
+
+ verifyReplication(tableName, regionReplication, 0, 6000);
+ } finally {
+ table.close();
+ connection.close();
+ }
+ }
+
+ @Test (timeout = 60000)
+ public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
+ testRegionReplicaReplicationIgnoresDisabledTables(false);
+ }
+
+ @Test (timeout = 60000)
+ public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
+ testRegionReplicaReplicationIgnoresDisabledTables(true);
+ }
+
+ public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable)
+ throws Exception {
+ // tests having edits from a disabled or dropped table is handled correctly by skipping those
+ // entries and further edits after the edits from dropped/disabled table can be replicated
+ // without problems.
+ TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables"
+ + dropTable);
+ HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
+ int regionReplication = 3;
+ htd.setRegionReplication(regionReplication);
+ HTU.deleteTableIfAny(tableName);
+ HTU.getHBaseAdmin().createTable(htd);
+ TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable");
+ HTU.deleteTableIfAny(toBeDisabledTable);
+ htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
+ htd.setRegionReplication(regionReplication);
+ HTU.getHBaseAdmin().createTable(htd);
+
+ // both tables are created, now pause replication
+ ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
+ admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
+
+ // now that the replication is disabled, write to the table to be dropped, then drop the table.
+
+ HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration());
+ HTableInterface table = connection.getTable(tableName);
+ HTableInterface tableToBeDisabled = connection.getTable(toBeDisabledTable);
+
+ HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
+
+ AtomicLong skippedEdits = new AtomicLong();
+ RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
+ mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
+ when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
+ RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
+ new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
+ (ClusterConnection) connection,
+ Executors.newSingleThreadExecutor(), 1000);
+
+ HRegionLocation hrl = connection.locateRegion(toBeDisabledTable, HConstants.EMPTY_BYTE_ARRAY);
+ byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
+
+ Entry entry = new Entry(
+ new WALKey(encodedRegionName, toBeDisabledTable, 1),
+ new WALEdit());
+
+ HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table
+ if (dropTable) {
+ HTU.getHBaseAdmin().deleteTable(toBeDisabledTable);
+ }
+
+ sinkWriter.append(toBeDisabledTable, encodedRegionName,
+ HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
+
+ assertEquals(2, skippedEdits.get());
+
+ try {
+ // load some data to the to-be-dropped table
+
+ // load the data to the table
+ HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
+
+ // now enable the replication
+ admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
+
+ verifyReplication(tableName, regionReplication, 0, 1000);
+
+ } finally {
+ admin.close();
+ table.close();
+ tableToBeDisabled.close();
+ HTU.deleteTableIfAny(toBeDisabledTable);
+ connection.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
new file mode 100644
index 0000000..8ecfd76
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
+import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.coprocessor.BaseWALObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
+import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
+ * class contains lower level tests using callables.
+ */
+@Category(MediumTests.class)
+public class TestRegionReplicaReplicationEndpointNoMaster {
+
+ private static final Log LOG = LogFactory.getLog(
+ TestRegionReplicaReplicationEndpointNoMaster.class);
+
+ private static final int NB_SERVERS = 2;
+ private static TableName tableName = TableName.valueOf(
+ TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
+ private static HTable table;
+ private static final byte[] row = "TestRegionReplicaReplicator".getBytes();
+
+ private static HRegionServer rs0;
+ private static HRegionServer rs1;
+
+ private static HRegionInfo hriPrimary;
+ private static HRegionInfo hriSecondary;
+
+ private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+ private static final byte[] f = HConstants.CATALOG_FAMILY;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ Configuration conf = HTU.getConfiguration();
+ conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+ conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+
+ // install WALObserver coprocessor for tests
+ String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
+ if (walCoprocs == null) {
+ walCoprocs = WALEditCopro.class.getName();
+ } else {
+ walCoprocs += "," + WALEditCopro.class.getName();
+ }
+ HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+ walCoprocs);
+ HTU.startMiniCluster(NB_SERVERS);
+
+ // Create table then get the single region for our new table.
+ HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
+ table = HTU.createTable(htd, new byte[][]{f}, HTU.getConfiguration());
+
+ hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
+
+ // mock a secondary region info to open
+ hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
+ hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
+
+ // No master
+ TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+ rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
+ rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ table.close();
+ HTU.shutdownMiniCluster();
+ }
+
+ @Before
+ public void before() throws Exception{
+ entries.clear();
+ }
+
+ @After
+ public void after() throws Exception {
+ }
+
+ static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<Entry>();
+
+ public static class WALEditCopro extends BaseWALObserver {
+ public WALEditCopro() {
+ entries.clear();
+ }
+ @Override
+ public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ // only keep primary region's edits
+ if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) {
+ entries.add(new Entry(logKey, logEdit));
+ }
+ }
+ }
+
+ @Test
+ public void testReplayCallable() throws Exception {
+ // tests replaying the edits to a secondary region replica using the Callable directly
+ openRegion(HTU, rs0, hriSecondary);
+ ClusterConnection connection =
+ (ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration());
+
+ //load some data to primary
+ HTU.loadNumericRows(table, f, 0, 1000);
+
+ Assert.assertEquals(1000, entries.size());
+ // replay the edits to the secondary using replay callable
+ replicateUsingCallable(connection, entries);
+
+ HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
+ HTU.verifyNumericRows(region, f, 0, 1000);
+
+ HTU.deleteNumericRows(table, f, 0, 1000);
+ closeRegion(HTU, rs0, hriSecondary);
+ connection.close();
+ }
+
+ private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries)
+ throws IOException, RuntimeException {
+ Entry entry;
+ while ((entry = entries.poll()) != null) {
+ byte[] row = entry.getEdit().getCells().get(0).getRow();
+ RegionLocations locations = connection.locateRegion(tableName, row, true, true);
+ RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
+ RpcControllerFactory.instantiate(connection.getConfiguration()),
+ table.getName(), locations.getRegionLocation(1),
+ locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry),
+ new AtomicLong());
+
+ RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
+ connection.getConfiguration());
+ factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
+ }
+ }
+
+ @Test
+ public void testReplayCallableWithRegionMove() throws Exception {
+ // tests replaying the edits to a secondary region replica using the Callable directly while
+ // the region is moved to another location.It tests handling of RME.
+ openRegion(HTU, rs0, hriSecondary);
+ ClusterConnection connection =
+ (ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration());
+ //load some data to primary
+ HTU.loadNumericRows(table, f, 0, 1000);
+
+ Assert.assertEquals(1000, entries.size());
+ // replay the edits to the secondary using replay callable
+ replicateUsingCallable(connection, entries);
+
+ HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
+ HTU.verifyNumericRows(region, f, 0, 1000);
+
+ HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
+
+ // move the secondary region from RS0 to RS1
+ closeRegion(HTU, rs0, hriSecondary);
+ openRegion(HTU, rs1, hriSecondary);
+
+ // replicate the new data
+ replicateUsingCallable(connection, entries);
+
+ region = rs1.getFromOnlineRegions(hriSecondary.getEncodedName());
+ // verify the new data. old data may or may not be there
+ HTU.verifyNumericRows(region, f, 1000, 2000);
+
+ HTU.deleteNumericRows(table, f, 0, 2000);
+ closeRegion(HTU, rs1, hriSecondary);
+ connection.close();
+ }
+
+ @Test
+ public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
+ // tests replaying the edits to a secondary region replica using the RRRE.replicate()
+ openRegion(HTU, rs0, hriSecondary);
+ ClusterConnection connection =
+ (ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration());
+ RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
+
+ ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
+ when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
+
+ replicator.init(context);
+ replicator.start();
+
+ //load some data to primary
+ HTU.loadNumericRows(table, f, 0, 1000);
+
+ Assert.assertEquals(1000, entries.size());
+ // replay the edits to the secondary using replay callable
+ replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries)));
+
+ HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
+ HTU.verifyNumericRows(region, f, 0, 1000);
+
+ HTU.deleteNumericRows(table, f, 0, 1000);
+ closeRegion(HTU, rs0, hriSecondary);
+ connection.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
index f6bd2b1..1f07a4c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.NavigableSet;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -128,10 +128,8 @@ public class TestWALMethods {
Configuration conf = new Configuration();
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
- WALSplitter splitter = new WALSplitter(WALFactory.getInstance(conf),
- conf, mock(Path.class), mock(FileSystem.class), null, null, mode);
- EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
+ EntryBuffers sink = new EntryBuffers(new PipelineController(), 1*1024*1024);
for (int i = 0; i < 1000; i++) {
WAL.Entry entry = createTestLogEntry(i);
sink.appendEntry(entry);
[2/2] hbase git commit: HBASE-11568. Async WAL replication for region
replicas (Enis Soztutar)
Posted by dd...@apache.org.
HBASE-11568. Async WAL replication for region replicas (Enis Soztutar)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/055f5a95
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/055f5a95
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/055f5a95
Branch: refs/heads/branch-1
Commit: 055f5a95e46e55412e05892f86fc0e0e2792356c
Parents: fb867f7
Author: Devaraj Das <dd...@apache.org>
Authored: Fri Feb 6 07:48:32 2015 -0800
Committer: Devaraj Das <dd...@apache.org>
Committed: Fri Feb 6 07:48:32 2015 -0800
----------------------------------------------------------------------
.../hadoop/hbase/client/RpcRetryingCaller.java | 2 +-
.../src/main/resources/hbase-default.xml | 14 +
.../master/handler/CreateTableHandler.java | 13 +-
.../hbase/protobuf/ReplicationProtbufUtil.java | 20 +-
.../hbase/regionserver/RSRpcServices.java | 23 +-
.../RegionReplicaReplicationEndpoint.java | 558 +++++++++++++++++++
.../regionserver/ReplicationSource.java | 3 +-
.../hbase/util/ServerRegionReplicaUtil.java | 51 ++
.../apache/hadoop/hbase/wal/WALSplitter.java | 206 ++++---
.../hadoop/hbase/HBaseTestingUtility.java | 35 +-
.../hbase/regionserver/TestRegionReplicas.java | 95 +---
.../regionserver/TestRegionServerNoMaster.java | 94 ++--
.../TestRegionReplicaReplicationEndpoint.java | 345 ++++++++++++
...egionReplicaReplicationEndpointNoMaster.java | 265 +++++++++
.../apache/hadoop/hbase/wal/TestWALMethods.java | 6 +-
15 files changed, 1520 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index b2020bd..896222c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -49,7 +49,7 @@ import com.google.protobuf.ServiceException;
*/
@InterfaceAudience.Private
public class RpcRetryingCaller<T> {
- static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
+ public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
/**
* When we started making calls.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index b10a006..c14bb8b 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1288,6 +1288,20 @@ possible configurations would overwhelm and obscure the important.
</description>
</property>
<property>
+ <name>hbase.region.replica.replication.enabled</name>
+ <value>false</value>
+ <description>
+ Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
+ If this is enabled, a replication peer named "region_replica_replication" will be created
+ which will tail the logs and replicate the mutatations to region replicas for tables that
+ have region replication > 1. If this is enabled once, disabling this replication also
+ requires disabling the replication peer using shell or ReplicationAdmin java class.
+ Replication to secondary region replicas works over standard inter-cluster replication.
+ So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication"
+ to true for this feature to work.
+ </description>
+ </property>
+ <property>
<name>hbase.http.filter.initializers</name>
<value>org.apache.hadoop.hbase.http.lib.StaticUserWebFilter</value>
<description>
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
index a17432c..f9d0d24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
/**
* Handler to create a table.
@@ -215,9 +216,8 @@ public class CreateTableHandler extends EventHandler {
*/
protected void completed(final Throwable exception) {
releaseTableLock();
- String msg = exception == null ? null : exception.getMessage();
LOG.info("Table, " + this.hTableDescriptor.getTableName() + ", creation " +
- msg == null ? "successful" : "failed. " + msg);
+ (exception == null ? "successful" : "failed. " + exception));
if (exception != null) {
removeEnablingTable(this.assignmentManager, this.hTableDescriptor.getTableName());
}
@@ -262,11 +262,16 @@ public class CreateTableHandler extends EventHandler {
// 5. Add replicas if needed
regionInfos = addReplicas(hTableDescriptor, regionInfos);
- // 6. Trigger immediate assignment of the regions in round-robin fashion
+ // 6. Setup replication for region replicas if needed
+ if (hTableDescriptor.getRegionReplication() > 1) {
+ ServerRegionReplicaUtil.setupRegionReplicaReplication(conf);
+ }
+
+ // 7. Trigger immediate assignment of the regions in round-robin fashion
ModifyRegionUtils.assignRegions(assignmentManager, regionInfos);
}
- // 7. Set table enabled flag up in zk.
+ // 8. Set table enabled flag up in zk.
try {
assignmentManager.getTableStateManager().setTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED);
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index d68d247..61d1a9a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -59,7 +59,7 @@ public class ReplicationProtbufUtil {
public static void replicateWALEntry(final AdminService.BlockingInterface admin,
final Entry[] entries) throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
- buildReplicateWALEntryRequest(entries);
+ buildReplicateWALEntryRequest(entries, null);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
try {
admin.replicateWALEntry(controller, p.getFirst());
@@ -78,6 +78,20 @@ public class ReplicationProtbufUtil {
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final Entry[] entries) {
// Accumulate all the Cells seen in here.
+ return buildReplicateWALEntryRequest(entries, null);
+ }
+
+ /**
+ * Create a new ReplicateWALEntryRequest from a list of HLog entries
+ *
+ * @param entries the HLog entries to be replicated
+ * @param encodedRegionName alternative region name to use if not null
+ * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
+ * found.
+ */
+ public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
+ buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) {
+ // Accumulate all the KVs seen in here.
List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
int size = 0;
WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
@@ -91,7 +105,9 @@ public class ReplicationProtbufUtil {
WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
WALKey key = entry.getKey();
keyBuilder.setEncodedRegionName(
- ByteStringer.wrap(key.getEncodedRegionName()));
+ ByteStringer.wrap(encodedRegionName == null
+ ? key.getEncodedRegionName()
+ : encodedRegionName));
keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
keyBuilder.setWriteTime(key.getWriteTime());
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b5a2343..f2e5571 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
@@ -157,6 +158,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALSplitter;
@@ -1452,11 +1454,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// empty input
return ReplicateWALEntryResponse.newBuilder().build();
}
- HRegion region = regionServer.getRegionByEncodedName(
- entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
- RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
+ ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
+ HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
+ RegionCoprocessorHost coprocessorHost =
+ ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
+ ? region.getCoprocessorHost()
+ : null; // do not invoke coprocessors if this is a secondary region replica
List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
+
+ // Skip adding the edits to WAL if this is a secondary region replica
+ boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
+ Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
+
for (WALEntry entry : entries) {
+ if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
+ throw new NotServingRegionException("Replay request contains entries from multiple " +
+ "regions. First region:" + regionName.toStringUtf8() + " , other region:"
+ + entry.getKey().getEncodedRegionName());
+ }
if (regionServer.nonceManager != null) {
long nonceGroup = entry.getKey().hasNonceGroup()
? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
@@ -1466,7 +1481,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<WALKey, WALEdit>();
List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
- cells, walEntry);
+ cells, walEntry, durability);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue.
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
new file mode 100644
index 0000000..3dab12a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -0,0 +1,558 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.RetryingCallable;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
+import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.protobuf.ServiceException;
+
+/**
+ * A {@link ReplicationEndpoint} endpoint which receives the WAL edits from the
+ * WAL, and sends the edits to replicas of regions.
+ */
+@InterfaceAudience.Private
+public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
+
+ private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class);
+
+ private Configuration conf;
+ private ClusterConnection connection;
+
+ // Reuse WALSplitter constructs as a WAL pipe
+ private PipelineController controller;
+ private RegionReplicaOutputSink outputSink;
+ private EntryBuffers entryBuffers;
+
+ // Number of writer threads
+ private int numWriterThreads;
+
+ private int operationTimeout;
+
+ private ExecutorService pool;
+
+ @Override
+ public void init(Context context) throws IOException {
+ super.init(context);
+
+ this.conf = HBaseConfiguration.create(context.getConfiguration());
+
+ String codecClassName = conf
+ .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
+ conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
+
+ this.numWriterThreads = this.conf.getInt(
+ "hbase.region.replica.replication.writer.threads", 3);
+ controller = new PipelineController();
+ entryBuffers = new EntryBuffers(controller,
+ this.conf.getInt("hbase.region.replica.replication.buffersize",
+ 128*1024*1024));
+
+ // use the regular RPC timeout for replica replication RPC's
+ this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+ }
+
+ @Override
+ protected void doStart() {
+ try {
+ connection = (ClusterConnection) HConnectionManager.createConnection(ctx.getConfiguration());
+ this.pool = getDefaultThreadPool(conf);
+ outputSink = new RegionReplicaOutputSink(controller, entryBuffers, connection, pool,
+ numWriterThreads, operationTimeout);
+ outputSink.startWriterThreads();
+ super.doStart();
+ } catch (IOException ex) {
+ LOG.warn("Received exception while creating connection :" + ex);
+ notifyFailed(ex);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ if (outputSink != null) {
+ try {
+ outputSink.finishWritingAndClose();
+ } catch (IOException ex) {
+ LOG.warn("Got exception while trying to close OutputSink");
+ LOG.warn(ex);
+ }
+ }
+ if (this.pool != null) {
+ this.pool.shutdownNow();
+ try {
+ // wait for 10 sec
+ boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
+ if (!shutdown) {
+ LOG.warn("Failed to shutdown the thread pool after 10 seconds");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
+ }
+ }
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException ex) {
+ LOG.warn("Got exception closing connection :" + ex);
+ }
+ }
+ super.doStop();
+ }
+
+ /**
+ * Returns a Thread pool for the RPC's to region replicas. Similar to
+ * Connection's thread pool.
+ */
+ private ExecutorService getDefaultThreadPool(Configuration conf) {
+ int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
+ int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16);
+ if (maxThreads == 0) {
+ maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+ }
+ if (coreThreads == 0) {
+ coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+ }
+ long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
+ LinkedBlockingQueue<Runnable> workQueue =
+ new LinkedBlockingQueue<Runnable>(maxThreads *
+ conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+ HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(
+ coreThreads,
+ maxThreads,
+ keepAliveTime,
+ TimeUnit.SECONDS,
+ workQueue,
+ Threads.newDaemonThreadFactory(this.getClass().toString() + "-rpc-shared-"));
+ tpe.allowCoreThreadTimeOut(true);
+ return tpe;
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ /* A note on batching in RegionReplicaReplicationEndpoint (RRRE):
+ *
+ * RRRE relies on batching from two different mechanisms. The first is the batching from
+ * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single
+ * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most
+ * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing).
+ * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits
+ * to the WALSplitter.EntryBuffers which is a blocking buffer space of up to
+ * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits
+ * based on regions.
+ *
+ * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which
+ * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink).
+ * The SinkWriter in this case will send the wal edits to all secondary region replicas in
+ * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is
+ * being written to the sink, another buffer for the same region will not be made available to
+ * writers ensuring regions edits are not replayed out of order.
+ *
+ * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so
+ * that the replication can assume all edits are persisted. We may be able to do a better
+ * pipelining between the replication thread and output sinks later if it becomes a bottleneck.
+ */
+
+ while (this.isRunning()) {
+ try {
+ for (Entry entry: replicateContext.getEntries()) {
+ entryBuffers.appendEntry(entry);
+ }
+ outputSink.flush(); // make sure everything is flushed
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (IOException e) {
+ LOG.warn("Received IOException while trying to replicate"
+ + StringUtils.stringifyException(e));
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean canReplicateToSameCluster() {
+ return true;
+ }
+
+ @Override
+ protected WALEntryFilter getScopeWALEntryFilter() {
+ // we do not care about scope. We replicate everything.
+ return null;
+ }
+
+ static class RegionReplicaOutputSink extends OutputSink {
+ private RegionReplicaSinkWriter sinkWriter;
+
+ public RegionReplicaOutputSink(PipelineController controller, EntryBuffers entryBuffers,
+ ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) {
+ super(controller, entryBuffers, numWriters);
+ this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout);
+ }
+
+ @Override
+ public void append(RegionEntryBuffer buffer) throws IOException {
+ List<Entry> entries = buffer.getEntryBuffer();
+
+ if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
+ return;
+ }
+
+ sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
+ entries.get(0).getEdit().getCells().get(0).getRow(), entries);
+ }
+
+ @Override
+ public boolean flush() throws IOException {
+ // nothing much to do for now. Wait for the Writer threads to finish up
+ // append()'ing the data.
+ entryBuffers.waitUntilDrained();
+ return super.flush();
+ }
+
+ @Override
+ public List<Path> finishWritingAndClose() throws IOException {
+ finishWriting();
+ return null;
+ }
+
+ @Override
+ public Map<byte[], Long> getOutputCounts() {
+ return null; // only used in tests
+ }
+
+ @Override
+ public int getNumberOfRecoveredRegions() {
+ return 0;
+ }
+
+ AtomicLong getSkippedEditsCounter() {
+ return skippedEdits;
+ }
+ }
+
+ static class RegionReplicaSinkWriter extends SinkWriter {
+ RegionReplicaOutputSink sink;
+ ClusterConnection connection;
+ RpcControllerFactory rpcControllerFactory;
+ RpcRetryingCallerFactory rpcRetryingCallerFactory;
+ int operationTimeout;
+ ExecutorService pool;
+ Cache<TableName, Boolean> disabledAndDroppedTables;
+
+ public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
+ ExecutorService pool, int operationTimeout) {
+ this.sink = sink;
+ this.connection = connection;
+ this.operationTimeout = operationTimeout;
+ this.rpcRetryingCallerFactory
+ = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
+ this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
+ this.pool = pool;
+
+ int nonExistentTableCacheExpiryMs = connection.getConfiguration()
+ .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
+ // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
+ // table is created again with the same name, we might miss to replicate for that amount of
+ // time. But this cache prevents overloading meta requests for every edit from a deleted file.
+ disabledAndDroppedTables = CacheBuilder.newBuilder()
+ .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
+ .initialCapacity(10)
+ .maximumSize(1000)
+ .build();
+ }
+
+ public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
+ List<Entry> entries) throws IOException {
+
+ if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
+ sink.getSkippedEditsCounter().incrementAndGet();
+ return;
+ }
+
+ // get the replicas of the primary region
+ RegionLocations locations = null;
+ try {
+ locations = getRegionLocations(connection, tableName, row, true, 0);
+
+ if (locations == null) {
+ throw new HBaseIOException("Cannot locate locations for "
+ + tableName + ", row:" + Bytes.toStringBinary(row));
+ }
+ } catch (TableNotFoundException e) {
+ disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
+ // skip this entry
+ sink.getSkippedEditsCounter().addAndGet(entries.size());
+ return;
+ }
+
+ if (locations.size() == 1) {
+ return;
+ }
+
+ ArrayList<Future<ReplicateWALEntryResponse>> tasks
+ = new ArrayList<Future<ReplicateWALEntryResponse>>(2);
+
+ // check whether we should still replay this entry. If the regions are changed, or the
+ // entry is not coming form the primary region, filter it out.
+ HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
+ if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
+ encodedRegionName)) {
+ sink.getSkippedEditsCounter().addAndGet(entries.size());
+ return;
+ }
+
+
+ // All passed entries should belong to one region because it is coming from the EntryBuffers
+ // split per region. But the regions might split and merge (unlike log recovery case).
+ for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
+ HRegionLocation location = locations.getRegionLocation(replicaId);
+ if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
+ HRegionInfo regionInfo = location == null
+ ? RegionReplicaUtil.getRegionInfoForReplica(
+ locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
+ : location.getRegionInfo();
+ RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
+ rpcControllerFactory, tableName, location, regionInfo, row, entries,
+ sink.getSkippedEditsCounter());
+ Future<ReplicateWALEntryResponse> task = pool.submit(
+ new RetryingRpcCallable<ReplicateWALEntryResponse>(rpcRetryingCallerFactory,
+ callable, operationTimeout));
+ tasks.add(task);
+ }
+ }
+
+ boolean tasksCancelled = false;
+ for (Future<ReplicateWALEntryResponse> task : tasks) {
+ try {
+ task.get();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.getMessage());
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ // The table can be disabled or dropped at this time. For disabled tables, we have no
+ // cheap mechanism to detect this case because meta does not contain this information.
+ // HConnection.isTableDisabled() is a zk call which we cannot do for every replay RPC.
+ // So instead we start the replay RPC with retries and
+ // check whether the table is dropped or disabled which might cause
+ // SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE.
+ if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
+ disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
+ if (!tasksCancelled) {
+ sink.getSkippedEditsCounter().addAndGet(entries.size());
+ tasksCancelled = true; // so that we do not add to skipped counter again
+ }
+ continue;
+ }
+ // otherwise rethrow
+ throw (IOException)cause;
+ }
+ // unexpected exception
+ throw new IOException(cause);
+ }
+ }
+ }
+ }
+
+ static class RetryingRpcCallable<V> implements Callable<V> {
+ RpcRetryingCallerFactory factory;
+ RetryingCallable<V> callable;
+ int timeout;
+ public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
+ int timeout) {
+ this.factory = factory;
+ this.callable = callable;
+ this.timeout = timeout;
+ }
+ @Override
+ public V call() throws Exception {
+ return factory.<V>newCaller().callWithRetries(callable, timeout);
+ }
+ }
+
+ /**
+ * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
+ * the entry if the region boundaries have changed or the region is gone.
+ */
+ static class RegionReplicaReplayCallable
+ extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
+ // replicaId of the region replica that we want to replicate to
+ private final int replicaId;
+
+ private final List<Entry> entries;
+ private final byte[] initialEncodedRegionName;
+ private final AtomicLong skippedEntries;
+ private final RpcControllerFactory rpcControllerFactory;
+ private boolean skip;
+
+ public RegionReplicaReplayCallable(ClusterConnection connection,
+ RpcControllerFactory rpcControllerFactory, TableName tableName,
+ HRegionLocation location, HRegionInfo regionInfo, byte[] row,List<Entry> entries,
+ AtomicLong skippedEntries) {
+ super(connection, location, tableName, row);
+ this.replicaId = regionInfo.getReplicaId();
+ this.entries = entries;
+ this.rpcControllerFactory = rpcControllerFactory;
+ this.skippedEntries = skippedEntries;
+ this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
+ }
+
+ @Override
+ public HRegionLocation getLocation(boolean useCache) throws IOException {
+ RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId);
+ if (rl == null) {
+ throw new HBaseIOException(getExceptionMessage());
+ }
+ location = rl.getRegionLocation(replicaId);
+ if (location == null) {
+ throw new HBaseIOException(getExceptionMessage());
+ }
+
+ // check whether we should still replay this entry. If the regions are changed, or the
+ // entry is not coming form the primary region, filter it out because we do not need it.
+ // Regions can change because of (1) region split (2) region merge (3) table recreated
+ if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
+ initialEncodedRegionName)) {
+ skip = true;
+ return null;
+ }
+
+ return location;
+ }
+
+ @Override
+ public ReplicateWALEntryResponse call(int timeout) throws IOException {
+ return replayToServer(this.entries, timeout);
+ }
+
+ private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
+ throws IOException {
+ if (entries.isEmpty() || skip) {
+ skippedEntries.incrementAndGet();
+ return ReplicateWALEntryResponse.newBuilder().build();
+ }
+
+ Entry[] entriesArray = new Entry[entries.size()];
+ entriesArray = entries.toArray(entriesArray);
+
+ // set the region name for the target region replica
+ Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
+ ReplicationProtbufUtil.buildReplicateWALEntryRequest(
+ entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
+ try {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
+ controller.setCallTimeout(timeout);
+ controller.setPriority(tableName);
+ return stub.replay(controller, p.getFirst());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
+ @Override
+ protected String getExceptionMessage() {
+ return super.getExceptionMessage() + " table=" + tableName
+ + " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row);
+ }
+ }
+
+ private static RegionLocations getRegionLocations(
+ ClusterConnection connection, TableName tableName, byte[] row,
+ boolean useCache, int replicaId)
+ throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
+ RegionLocations rl;
+ try {
+ rl = connection.locateRegion(tableName, row, useCache, true, replicaId);
+ } catch (DoNotRetryIOException e) {
+ throw e;
+ } catch (RetriesExhaustedException e) {
+ throw e;
+ } catch (InterruptedIOException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RetriesExhaustedException("Can't get the location", e);
+ }
+ if (rl == null) {
+ throw new RetriesExhaustedException("Can't get the locations");
+ }
+
+ return rl;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 6e2ef2d..ee43956 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -717,7 +717,8 @@ public class ReplicationSource extends Thread
}
break;
} catch (Exception ex) {
- LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex);
+ LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
+ org.apache.hadoop.util.StringUtils.stringifyException(ex));
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 7dafa68..cf87219 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -25,10 +25,15 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
/**
* Similar to {@link RegionReplicaUtil} but for the server side
@@ -36,6 +41,21 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
public class ServerRegionReplicaUtil extends RegionReplicaUtil {
/**
+ * Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
+ * If this is enabled, a replication peer named "region_replica_replication" will be created
+ * which will tail the logs and replicate the mutatations to region replicas for tables that
+ * have region replication > 1. If this is enabled once, disabling this replication also
+ * requires disabling the replication peer using shell or ReplicationAdmin java class.
+ * Replication to secondary region replicas works over standard inter-cluster replication.·
+ * So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication"·
+ * to true for this feature to work.
+ */
+ public static final String REGION_REPLICA_REPLICATION_CONF_KEY
+ = "hbase.region.replica.replication.enabled";
+ private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
+ private static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
+
+ /**
* Returns the regionInfo object to use for interacting with the file system.
* @return An HRegionInfo object to interact with the filesystem
*/
@@ -95,4 +115,35 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
return new StoreFileInfo(conf, fs, status, link);
}
+ /**
+ * Create replication peer for replicating to region replicas if needed.
+ * @param conf configuration to use
+ * @throws IOException
+ */
+ public static void setupRegionReplicaReplication(Configuration conf) throws IOException {
+ if (!conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, DEFAULT_REGION_REPLICA_REPLICATION)) {
+ return;
+ }
+ ReplicationAdmin repAdmin = new ReplicationAdmin(conf);
+ try {
+ if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) {
+ ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
+ peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf));
+ peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
+ repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null);
+ }
+ } catch (ReplicationException ex) {
+ throw new IOException(ex);
+ } finally {
+ repAdmin.close();
+ }
+ }
+
+ /**
+ * Return the peer id used for replicating to secondary region replicas
+ */
+ public static String getReplicationPeerId() {
+ return REGION_REPLICA_REPLICATION_PEER;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 1744adf..a436f29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -151,6 +151,7 @@ public class WALSplitter {
// Major subcomponents of the split process.
// These are separated into inner classes to make testing easier.
+ PipelineController controller;
OutputSink outputSink;
EntryBuffers entryBuffers;
@@ -159,14 +160,6 @@ public class WALSplitter {
private BaseCoordinatedStateManager csm;
private final WALFactory walFactory;
- // If an exception is thrown by one of the other threads, it will be
- // stored here.
- protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
-
- // Wait/notify for when data has been produced by the reader thread,
- // consumed by the reader thread, or an exception occurred
- final Object dataAvailable = new Object();
-
private MonitoredTask status;
// For checking the latest flushed sequence id
@@ -202,8 +195,9 @@ public class WALSplitter {
this.sequenceIdChecker = idChecker;
this.csm = (BaseCoordinatedStateManager)csm;
this.walFactory = factory;
+ this.controller = new PipelineController();
- entryBuffers = new EntryBuffers(
+ entryBuffers = new EntryBuffers(controller,
this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
128*1024*1024));
@@ -214,13 +208,13 @@ public class WALSplitter {
this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if (csm != null && this.distributedLogReplay) {
- outputSink = new LogReplayOutputSink(numWriterThreads);
+ outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
} else {
if (this.distributedLogReplay) {
LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
}
this.distributedLogReplay = false;
- outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
+ outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
}
}
@@ -828,22 +822,6 @@ public class WALSplitter {
}
}
- private void writerThreadError(Throwable t) {
- thrown.compareAndSet(null, t);
- }
-
- /**
- * Check for errors in the writer threads. If any is found, rethrow it.
- */
- private void checkForErrors() throws IOException {
- Throwable thrown = this.thrown.get();
- if (thrown == null) return;
- if (thrown instanceof IOException) {
- throw new IOException(thrown);
- } else {
- throw new RuntimeException(thrown);
- }
- }
/**
* Create a new {@link Writer} for writing log splits.
* @return a new Writer instance, caller should close
@@ -873,13 +851,45 @@ public class WALSplitter {
}
/**
+ * Contains some methods to control WAL-entries producer / consumer interactions
+ */
+ public static class PipelineController {
+ // If an exception is thrown by one of the other threads, it will be
+ // stored here.
+ AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+
+ // Wait/notify for when data has been produced by the writer thread,
+ // consumed by the reader thread, or an exception occurred
+ public final Object dataAvailable = new Object();
+
+ void writerThreadError(Throwable t) {
+ thrown.compareAndSet(null, t);
+ }
+
+ /**
+ * Check for errors in the writer threads. If any is found, rethrow it.
+ */
+ void checkForErrors() throws IOException {
+ Throwable thrown = this.thrown.get();
+ if (thrown == null) return;
+ if (thrown instanceof IOException) {
+ throw new IOException(thrown);
+ } else {
+ throw new RuntimeException(thrown);
+ }
+ }
+ }
+
+ /**
* Class which accumulates edits and separates them into a buffer per region
* while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
* a predefined threshold.
*
* Writer threads then pull region-specific buffers from this class.
*/
- class EntryBuffers {
+ public static class EntryBuffers {
+ PipelineController controller;
+
Map<byte[], RegionEntryBuffer> buffers =
new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
@@ -891,7 +901,8 @@ public class WALSplitter {
long totalBuffered = 0;
long maxHeapUsage;
- EntryBuffers(long maxHeapUsage) {
+ public EntryBuffers(PipelineController controller, long maxHeapUsage) {
+ this.controller = controller;
this.maxHeapUsage = maxHeapUsage;
}
@@ -902,7 +913,7 @@ public class WALSplitter {
* @throws InterruptedException
* @throws IOException
*/
- void appendEntry(Entry entry) throws InterruptedException, IOException {
+ public void appendEntry(Entry entry) throws InterruptedException, IOException {
WALKey key = entry.getKey();
RegionEntryBuffer buffer;
@@ -917,15 +928,15 @@ public class WALSplitter {
}
// If we crossed the chunk threshold, wait for more space to be available
- synchronized (dataAvailable) {
+ synchronized (controller.dataAvailable) {
totalBuffered += incrHeap;
- while (totalBuffered > maxHeapUsage && thrown.get() == null) {
+ while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
- dataAvailable.wait(2000);
+ controller.dataAvailable.wait(2000);
}
- dataAvailable.notifyAll();
+ controller.dataAvailable.notifyAll();
}
- checkForErrors();
+ controller.checkForErrors();
}
/**
@@ -958,16 +969,30 @@ public class WALSplitter {
}
long size = buffer.heapSize();
- synchronized (dataAvailable) {
+ synchronized (controller.dataAvailable) {
totalBuffered -= size;
// We may unblock writers
- dataAvailable.notifyAll();
+ controller.dataAvailable.notifyAll();
}
}
synchronized boolean isRegionCurrentlyWriting(byte[] region) {
return currentlyWriting.contains(region);
}
+
+ public void waitUntilDrained() {
+ synchronized (controller.dataAvailable) {
+ while (totalBuffered > 0) {
+ try {
+ controller.dataAvailable.wait(2000);
+ } catch (InterruptedException e) {
+ LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
+ Thread.interrupted();
+ break;
+ }
+ }
+ }
+ }
}
/**
@@ -976,7 +1001,7 @@ public class WALSplitter {
* share a single byte array instance for the table and region name.
* Also tracks memory usage of the accumulated edits.
*/
- static class RegionEntryBuffer implements HeapSize {
+ public static class RegionEntryBuffer implements HeapSize {
long heapInBuffer = 0;
List<Entry> entryBuffer;
TableName tableName;
@@ -1008,14 +1033,30 @@ public class WALSplitter {
public long heapSize() {
return heapInBuffer;
}
+
+ public byte[] getEncodedRegionName() {
+ return encodedRegionName;
+ }
+
+ public List<Entry> getEntryBuffer() {
+ return entryBuffer;
+ }
+
+ public TableName getTableName() {
+ return tableName;
+ }
}
- class WriterThread extends Thread {
+ public static class WriterThread extends Thread {
private volatile boolean shouldStop = false;
+ private PipelineController controller;
+ private EntryBuffers entryBuffers;
private OutputSink outputSink = null;
- WriterThread(OutputSink sink, int i) {
+ WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
super(Thread.currentThread().getName() + "-Writer-" + i);
+ this.controller = controller;
+ this.entryBuffers = entryBuffers;
outputSink = sink;
}
@@ -1025,7 +1066,7 @@ public class WALSplitter {
doRun();
} catch (Throwable t) {
LOG.error("Exiting thread", t);
- writerThreadError(t);
+ controller.writerThreadError(t);
}
}
@@ -1035,12 +1076,12 @@ public class WALSplitter {
RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
if (buffer == null) {
// No data currently available, wait on some more to show up
- synchronized (dataAvailable) {
+ synchronized (controller.dataAvailable) {
if (shouldStop && !this.outputSink.flush()) {
return;
}
try {
- dataAvailable.wait(500);
+ controller.dataAvailable.wait(500);
} catch (InterruptedException ie) {
if (!shouldStop) {
throw new RuntimeException(ie);
@@ -1064,9 +1105,9 @@ public class WALSplitter {
}
void finish() {
- synchronized (dataAvailable) {
+ synchronized (controller.dataAvailable) {
shouldStop = true;
- dataAvailable.notifyAll();
+ controller.dataAvailable.notifyAll();
}
}
}
@@ -1075,7 +1116,10 @@ public class WALSplitter {
* The following class is an abstraction class to provide a common interface to support both
* existing recovered edits file sink and region server WAL edits replay sink
*/
- abstract class OutputSink {
+ public static abstract class OutputSink {
+
+ protected PipelineController controller;
+ protected EntryBuffers entryBuffers;
protected Map<byte[], SinkWriter> writers = Collections
.synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
@@ -1101,8 +1145,10 @@ public class WALSplitter {
protected List<Path> splits = null;
- public OutputSink(int numWriters) {
+ public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
numThreads = numWriters;
+ this.controller = controller;
+ this.entryBuffers = entryBuffers;
}
void setReporter(CancelableProgressable reporter) {
@@ -1112,9 +1158,9 @@ public class WALSplitter {
/**
* Start the threads that will pump data from the entryBuffers to the output files.
*/
- synchronized void startWriterThreads() {
+ public synchronized void startWriterThreads() {
for (int i = 0; i < numThreads; i++) {
- WriterThread t = new WriterThread(this, i);
+ WriterThread t = new WriterThread(controller, entryBuffers, this, i);
t.start();
writerThreads.add(t);
}
@@ -1173,34 +1219,34 @@ public class WALSplitter {
throw iie;
}
}
- checkForErrors();
+ controller.checkForErrors();
LOG.info("Split writers finished");
return (!progress_failed);
}
- abstract List<Path> finishWritingAndClose() throws IOException;
+ public abstract List<Path> finishWritingAndClose() throws IOException;
/**
* @return a map from encoded region ID to the number of edits written out for that region.
*/
- abstract Map<byte[], Long> getOutputCounts();
+ public abstract Map<byte[], Long> getOutputCounts();
/**
* @return number of regions we've recovered
*/
- abstract int getNumberOfRecoveredRegions();
+ public abstract int getNumberOfRecoveredRegions();
/**
* @param buffer A WAL Edit Entry
* @throws IOException
*/
- abstract void append(RegionEntryBuffer buffer) throws IOException;
+ public abstract void append(RegionEntryBuffer buffer) throws IOException;
/**
* WriterThread call this function to help flush internal remaining edits in buffer before close
* @return true when underlying sink has something to flush
*/
- protected boolean flush() throws IOException {
+ public boolean flush() throws IOException {
return false;
}
}
@@ -1210,13 +1256,14 @@ public class WALSplitter {
*/
class LogRecoveredEditsOutputSink extends OutputSink {
- public LogRecoveredEditsOutputSink(int numWriters) {
+ public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
+ int numWriters) {
// More threads could potentially write faster at the expense
// of causing more disk seeks as the logs are split.
// 3. After a certain setting (probably around 3) the
// process will be bound on the reader in the current
// implementation anyway.
- super(numWriters);
+ super(controller, entryBuffers, numWriters);
}
/**
@@ -1224,7 +1271,7 @@ public class WALSplitter {
* @throws IOException
*/
@Override
- List<Path> finishWritingAndClose() throws IOException {
+ public List<Path> finishWritingAndClose() throws IOException {
boolean isSuccessful = false;
List<Path> result = null;
try {
@@ -1442,7 +1489,7 @@ public class WALSplitter {
}
@Override
- void append(RegionEntryBuffer buffer) throws IOException {
+ public void append(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) {
LOG.warn("got an empty buffer, skipping");
@@ -1483,7 +1530,7 @@ public class WALSplitter {
* @return a map from encoded region ID to the number of edits written out for that region.
*/
@Override
- Map<byte[], Long> getOutputCounts() {
+ public Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
synchronized (writers) {
for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
@@ -1494,7 +1541,7 @@ public class WALSplitter {
}
@Override
- int getNumberOfRecoveredRegions() {
+ public int getNumberOfRecoveredRegions() {
return writers.size();
}
}
@@ -1502,7 +1549,7 @@ public class WALSplitter {
/**
* Class wraps the actual writer which writes data out and related statistics
*/
- private abstract static class SinkWriter {
+ public abstract static class SinkWriter {
/* Count of edits written to this path */
long editsWritten = 0;
/* Number of nanos spent writing to this log */
@@ -1563,17 +1610,18 @@ public class WALSplitter {
private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
private boolean hasEditsInDisablingOrDisabledTables = false;
- public LogReplayOutputSink(int numWriters) {
- super(numWriters);
- this.waitRegionOnlineTimeOut =
- conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
- ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
- this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
+ public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
+ int numWriters) {
+ super(controller, entryBuffers, numWriters);
+ this.waitRegionOnlineTimeOut = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
+ ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
+ this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
+ entryBuffers, numWriters);
this.logRecoveredEditsOutputSink.setReporter(reporter);
}
@Override
- void append(RegionEntryBuffer buffer) throws IOException {
+ public void append(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) {
LOG.warn("got an empty buffer, skipping");
@@ -1889,7 +1937,7 @@ public class WALSplitter {
}
@Override
- protected boolean flush() throws IOException {
+ public boolean flush() throws IOException {
String curLoc = null;
int curSize = 0;
List<Pair<HRegionLocation, Entry>> curQueue = null;
@@ -1910,8 +1958,8 @@ public class WALSplitter {
if (curSize > 0) {
this.processWorkItems(curLoc, curQueue);
// We should already have control of the monitor; ensure this is the case.
- synchronized(dataAvailable) {
- dataAvailable.notifyAll();
+ synchronized(controller.dataAvailable) {
+ controller.dataAvailable.notifyAll();
}
return true;
}
@@ -1923,7 +1971,7 @@ public class WALSplitter {
}
@Override
- List<Path> finishWritingAndClose() throws IOException {
+ public List<Path> finishWritingAndClose() throws IOException {
try {
if (!finishWriting()) {
return null;
@@ -1998,7 +2046,7 @@ public class WALSplitter {
}
@Override
- Map<byte[], Long> getOutputCounts() {
+ public Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
synchronized (writers) {
for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
@@ -2009,7 +2057,7 @@ public class WALSplitter {
}
@Override
- int getNumberOfRecoveredRegions() {
+ public int getNumberOfRecoveredRegions() {
return this.recoveredRegions.size();
}
@@ -2115,12 +2163,13 @@ public class WALSplitter {
* @param cells
* @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
* extracted from the passed in WALEntry.
+ * @param durability
* @return list of Pair<MutationType, Mutation> to be replayed
* @throws IOException
*/
public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
- Pair<WALKey, WALEdit> logEntry) throws IOException {
-
+ Pair<WALKey, WALEdit> logEntry, Durability durability)
+ throws IOException {
if (entry == null) {
// return an empty array
return new ArrayList<MutationReplay>();
@@ -2168,6 +2217,9 @@ public class WALSplitter {
} else {
((Put) m).add(cell);
}
+ if (m != null) {
+ m.setDurability(durability);
+ }
previousCell = cell;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index b6e7df9..9ed121d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
@@ -1773,6 +1775,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
getHBaseAdmin().deleteTable(tableName);
}
+ /**
+ * Drop an existing table
+ * @param tableName existing table
+ */
+ public void deleteTableIfAny(TableName tableName) throws IOException {
+ try {
+ deleteTable(tableName);
+ } catch (TableNotFoundException e) {
+ // ignore
+ }
+ }
+
// ==========================================================================
// Canned table and table descriptor creation
// TODO replace HBaseTestCase
@@ -2085,7 +2099,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return rowCount;
}
- public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) throws IOException {
+ public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
+ throws IOException {
for (int i = startRow; i < endRow; i++) {
byte[] data = Bytes.toBytes(String.valueOf(i));
Put put = new Put(data);
@@ -2094,7 +2109,23 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
}
- public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) throws IOException {
+ public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
+ throws IOException {
+ for (int i = startRow; i < endRow; i++) {
+ String failMsg = "Failed verification of row :" + i;
+ byte[] data = Bytes.toBytes(String.valueOf(i));
+ Result result = region.get(new Get(data));
+ assertTrue(failMsg, result.containsColumn(f, null));
+ assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
+ Cell cell = result.getColumnLatestCell(f, null);
+ assertTrue(failMsg,
+ Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
+ cell.getValueLength()));
+ }
+ }
+
+ public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow)
+ throws IOException {
for (int i = startRow; i < endRow; i++) {
byte[] data = Bytes.toBytes(String.valueOf(i));
Delete delete = new Delete(data);
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index afeec4d..9220a49 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
+import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
@@ -33,7 +34,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TestMetaTableAccessor;
import org.apache.hadoop.hbase.client.Consistency;
@@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -121,61 +120,9 @@ public class TestRegionReplicas {
return HTU.getMiniHBaseCluster().getRegionServer(0);
}
- private void openRegion(HRegionInfo hri) throws Exception {
- ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
- // first version is '0'
- AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null, null);
- AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
- Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
- Assert.assertTrue(responseOpen.getOpeningState(0).
- equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED));
- checkRegionIsOpened(hri.getEncodedName());
- }
-
- private void closeRegion(HRegionInfo hri) throws Exception {
- ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
-
- AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(getRS().getServerName(),
- hri.getEncodedName(), true);
- AdminProtos.CloseRegionResponse responseClose = getRS().getRSRpcServices().closeRegion(null, crr);
- Assert.assertTrue(responseClose.getClosed());
-
- checkRegionIsClosed(hri.getEncodedName());
-
- ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), getRS().getServerName());
- }
-
- private void checkRegionIsOpened(String encodedRegionName) throws Exception {
-
- while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
- Thread.sleep(1);
- }
-
- Assert.assertTrue(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
-
- Assert.assertTrue(
- ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), encodedRegionName, getRS().getServerName()));
- }
-
-
- private void checkRegionIsClosed(String encodedRegionName) throws Exception {
-
- while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
- Thread.sleep(1);
- }
-
- try {
- Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
- } catch (NotServingRegionException expected) {
- // That's how it work: if the region is closed we have an exception.
- }
-
- // We don't delete the znode here, because there is not always a znode.
- }
-
@Test(timeout = 60000)
public void testOpenRegionReplica() throws Exception {
- openRegion(hriSecondary);
+ openRegion(HTU, getRS(), hriSecondary);
try {
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
@@ -184,22 +131,22 @@ public class TestRegionReplicas {
Assert.assertEquals(1000, HTU.countRows(table));
} finally {
HTU.deleteNumericRows(table, f, 0, 1000);
- closeRegion(hriSecondary);
+ closeRegion(HTU, getRS(), hriSecondary);
}
}
/** Tests that the meta location is saved for secondary regions */
@Test(timeout = 60000)
public void testRegionReplicaUpdatesMetaLocation() throws Exception {
- openRegion(hriSecondary);
+ openRegion(HTU, getRS(), hriSecondary);
Table meta = null;
try {
- meta = new HTable(HTU.getConfiguration(), TableName.META_TABLE_NAME);
+ meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME);
TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName()
, getRS().getServerName(), -1, 1, false);
} finally {
if (meta != null ) meta.close();
- closeRegion(hriSecondary);
+ closeRegion(HTU, getRS(), hriSecondary);
}
}
@@ -213,7 +160,7 @@ public class TestRegionReplicas {
// flush so that region replica can read
getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
- openRegion(hriSecondary);
+ openRegion(HTU, getRS(), hriSecondary);
// first try directly against region
HRegion region = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
@@ -222,7 +169,7 @@ public class TestRegionReplicas {
assertGetRpc(hriSecondary, 42, true);
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
- closeRegion(hriSecondary);
+ closeRegion(HTU, getRS(), hriSecondary);
}
}
@@ -236,7 +183,7 @@ public class TestRegionReplicas {
// flush so that region replica can read
getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
- openRegion(hriSecondary);
+ openRegion(HTU, getRS(), hriSecondary);
// try directly Get against region replica
byte[] row = Bytes.toBytes(String.valueOf(42));
@@ -247,7 +194,7 @@ public class TestRegionReplicas {
Assert.assertArrayEquals(row, result.getValue(f, null));
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
- closeRegion(hriSecondary);
+ closeRegion(HTU, getRS(), hriSecondary);
}
}
@@ -263,7 +210,8 @@ public class TestRegionReplicas {
}
// build a mock rpc
- private void assertGetRpc(HRegionInfo info, int value, boolean expect) throws IOException, ServiceException {
+ private void assertGetRpc(HRegionInfo info, int value, boolean expect)
+ throws IOException, ServiceException {
byte[] row = Bytes.toBytes(String.valueOf(value));
Get get = new Get(row);
ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
@@ -286,13 +234,14 @@ public class TestRegionReplicas {
// enable store file refreshing
final int refreshPeriod = 2000; // 2 sec
HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
- HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod);
+ HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
+ refreshPeriod);
// restart the region server so that it starts the refresher chore
restartRegionServer();
try {
LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
- openRegion(hriSecondary);
+ openRegion(HTU, getRS(), hriSecondary);
//load some data to primary
LOG.info("Loading data to primary region");
@@ -348,7 +297,7 @@ public class TestRegionReplicas {
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
- closeRegion(hriSecondary);
+ closeRegion(HTU, getRS(), hriSecondary);
}
}
@@ -365,7 +314,7 @@ public class TestRegionReplicas {
final int startKey = 0, endKey = 1000;
try {
- openRegion(hriSecondary);
+ openRegion(HTU, getRS(), hriSecondary);
//load some data to primary so that reader won't fail
HTU.loadNumericRows(table, f, startKey, endKey);
@@ -429,13 +378,13 @@ public class TestRegionReplicas {
// whether to do a close and open
if (random.nextInt(10) == 0) {
try {
- closeRegion(hriSecondary);
+ closeRegion(HTU, getRS(), hriSecondary);
} catch (Exception ex) {
LOG.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
exceptions[2].compareAndSet(null, ex);
}
try {
- openRegion(hriSecondary);
+ openRegion(HTU, getRS(), hriSecondary);
} catch (Exception ex) {
LOG.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
exceptions[2].compareAndSet(null, ex);
@@ -469,7 +418,7 @@ public class TestRegionReplicas {
}
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
- closeRegion(hriSecondary);
+ closeRegion(HTU, getRS(), hriSecondary);
}
}
@@ -481,7 +430,7 @@ public class TestRegionReplicas {
try {
LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
- openRegion(hriSecondary);
+ openRegion(HTU, getRS(), hriSecondary);
// load some data to primary
LOG.info("Loading data to primary region");
@@ -528,7 +477,7 @@ public class TestRegionReplicas {
Assert.assertEquals(4498500, sum);
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
- closeRegion(hriSecondary);
+ closeRegion(HTU, getRS(), hriSecondary);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/055f5a95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 381feb7..11fa938 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -133,59 +133,69 @@ public class TestRegionServerNoMaster {
return HTU.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer();
}
-
- /**
- * Reopen the region. Reused in multiple tests as we always leave the region open after a test.
- */
- private void reopenRegion() throws Exception {
- // We reopen. We need a ZK node here, as a open is always triggered by a master.
- ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
+ public static void openRegion(HBaseTestingUtility HTU, HRegionServer rs, HRegionInfo hri)
+ throws Exception {
+ ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, rs.getServerName());
// first version is '0'
AdminProtos.OpenRegionRequest orr =
- RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null, null);
- AdminProtos.OpenRegionResponse responseOpen = getRS().rpcServices.openRegion(null, orr);
+ RequestConverter.buildOpenRegionRequest(rs.getServerName(), hri, 0, null, null);
+ AdminProtos.OpenRegionResponse responseOpen = rs.rpcServices.openRegion(null, orr);
+
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
Assert.assertTrue(responseOpen.getOpeningState(0).
equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED));
- checkRegionIsOpened();
+ checkRegionIsOpened(HTU, rs, hri);
}
- private void checkRegionIsOpened() throws Exception {
-
- while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
+ public static void checkRegionIsOpened(HBaseTestingUtility HTU, HRegionServer rs,
+ HRegionInfo hri) throws Exception {
+ while (!rs.getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
- Assert.assertTrue(getRS().getRegion(regionName).isAvailable());
+ Assert.assertTrue(rs.getRegion(hri.getRegionName()).isAvailable());
Assert.assertTrue(
ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(),
- getRS().getServerName()));
+ rs.getServerName()));
}
+ public static void closeRegion(HBaseTestingUtility HTU, HRegionServer rs, HRegionInfo hri)
+ throws Exception {
+ ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, rs.getServerName());
+ AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
+ rs.getServerName(), hri.getEncodedName(), true);
+ AdminProtos.CloseRegionResponse responseClose = rs.rpcServices.closeRegion(null, crr);
+ Assert.assertTrue(responseClose.getClosed());
+ checkRegionIsClosed(HTU, rs, hri);
+ ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null);
+ }
- private void checkRegionIsClosed() throws Exception {
-
- while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
+ public static void checkRegionIsClosed(HBaseTestingUtility HTU, HRegionServer rs,
+ HRegionInfo hri) throws Exception {
+ while (!rs.getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
+ boolean exception = false;
try {
- Assert.assertFalse(getRS().getRegion(regionName).isAvailable());
+ while ((rs.getRegion(hri.getRegionName()).isAvailable())) {
+ Thread.sleep(10);
+ }
} catch (NotServingRegionException expected) {
+ exception = true;
// That's how it work: if the region is closed we have an exception.
}
-
+ assert(exception);
// We don't delete the znode here, because there is not always a znode.
}
-
/**
* Close the region without using ZK
*/
- private void closeNoZK() throws Exception {
+ private void closeRegionNoZK() throws Exception {
// no transition in ZK
AdminProtos.CloseRegionRequest crr =
RequestConverter.buildCloseRegionRequest(getRS().getServerName(), regionName, false);
@@ -193,14 +203,14 @@ public class TestRegionServerNoMaster {
Assert.assertTrue(responseClose.getClosed());
// now waiting & checking. After a while, the transition should be done and the region closed
- checkRegionIsClosed();
+ checkRegionIsClosed(HTU, getRS(), hri);
}
@Test(timeout = 60000)
public void testCloseByRegionServer() throws Exception {
- closeNoZK();
- reopenRegion();
+ closeRegionNoZK();
+ openRegion(HTU, getRS(), hri);
}
@Test(timeout = 60000)
@@ -231,12 +241,12 @@ public class TestRegionServerNoMaster {
AdminProtos.CloseRegionResponse responseClose = getRS().rpcServices.closeRegion(null, crr);
Assert.assertTrue(responseClose.getClosed());
- checkRegionIsClosed();
+ checkRegionIsClosed(HTU, getRS(), hri);
ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(),
getRS().getServerName());
- reopenRegion();
+ openRegion(HTU, getRS(), hri);
}
/**
@@ -253,8 +263,8 @@ public class TestRegionServerNoMaster {
public void testMultipleOpen() throws Exception {
// We close
- closeNoZK();
- checkRegionIsClosed();
+ closeRegionNoZK();
+ checkRegionIsClosed(HTU, getRS(), hri);
// We reopen. We need a ZK node here, as a open is always triggered by a master.
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
@@ -273,7 +283,7 @@ public class TestRegionServerNoMaster {
);
}
- checkRegionIsOpened();
+ checkRegionIsOpened(HTU, getRS(), hri);
}
@Test
@@ -317,14 +327,14 @@ public class TestRegionServerNoMaster {
}
}
- checkRegionIsClosed();
+ checkRegionIsClosed(HTU, getRS(), hri);
Assert.assertTrue(
ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(),
getRS().getServerName())
);
- reopenRegion();
+ openRegion(HTU, getRS(), hri);
}
/**
@@ -333,8 +343,8 @@ public class TestRegionServerNoMaster {
@Test(timeout = 60000)
public void testCancelOpeningWithoutZK() throws Exception {
// We close
- closeNoZK();
- checkRegionIsClosed();
+ closeRegionNoZK();
+ checkRegionIsClosed(HTU, getRS(), hri);
// Let do the initial steps, without having a handler
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
@@ -369,7 +379,7 @@ public class TestRegionServerNoMaster {
csm.getOpenRegionCoordination(), zkCrd));
// The open handler should have removed the region from RIT but kept the region closed
- checkRegionIsClosed();
+ checkRegionIsClosed(HTU, getRS(), hri);
// The open handler should have updated the value in ZK.
Assert.assertTrue(ZKAssign.deleteNode(
@@ -377,7 +387,7 @@ public class TestRegionServerNoMaster {
EventType.RS_ZK_REGION_FAILED_OPEN, 1)
);
- reopenRegion();
+ openRegion(HTU, getRS(), hri);
}
/**
@@ -387,8 +397,8 @@ public class TestRegionServerNoMaster {
@Test(timeout = 60000)
public void testCancelOpeningWithZK() throws Exception {
// We close
- closeNoZK();
- checkRegionIsClosed();
+ closeRegionNoZK();
+ checkRegionIsClosed(HTU, getRS(), hri);
// Let do the initial steps, without having a handler
getRS().getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE);
@@ -434,12 +444,12 @@ public class TestRegionServerNoMaster {
csm.getOpenRegionCoordination(), zkCrd));
// The open handler should have removed the region from RIT but kept the region closed
- checkRegionIsClosed();
+ checkRegionIsClosed(HTU, getRS(), hri);
// We should not find any znode here.
Assert.assertEquals(-1, ZKAssign.getVersion(HTU.getZooKeeperWatcher(), hri));
- reopenRegion();
+ openRegion(HTU, getRS(), hri);
}
/**
@@ -463,7 +473,7 @@ public class TestRegionServerNoMaster {
}
//actual close
- closeNoZK();
+ closeRegionNoZK();
try {
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
earlierServerName, hri, 0, null, null);
@@ -473,7 +483,7 @@ public class TestRegionServerNoMaster {
Assert.assertTrue(se.getCause() instanceof IOException);
Assert.assertTrue(se.getCause().getMessage().contains("This RPC was intended for a different server"));
} finally {
- reopenRegion();
+ openRegion(HTU, getRS(), hri);
}
}
}