You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/17 06:48:32 UTC
[22/50] [abbrv] hbase git commit: HBASE-20377 Deal with table in
enabling and disabling state when modifying serial replication peer
HBASE-20377 Deal with table in enabling and disabling state when modifying serial replication peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5a633adf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5a633adf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5a633adf
Branch: refs/heads/HBASE-19064
Commit: 5a633adffead3b979f6e1a607994409978b0ea74
Parents: 826909a
Author: zhangduo <zh...@apache.org>
Authored: Fri Apr 13 16:21:03 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Apr 13 20:33:29 2018 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/client/TableState.java | 14 ++++
.../master/procedure/DisableTableProcedure.java | 6 +-
.../master/replication/ModifyPeerProcedure.java | 84 +++++++++++++-------
.../TestAddToSerialReplicationPeer.java | 74 +++++++++++++++++
4 files changed, 146 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
index cc3b765..40612e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
@@ -104,6 +104,13 @@ public class TableState {
}
/**
+ * @return True if table is {@link State#ENABLING}.
+ */
+ public boolean isEnabling() {
+ return isInStates(State.ENABLING);
+ }
+
+ /**
* @return True if {@link State#ENABLED} or {@link State#ENABLING}
*/
public boolean isEnabledOrEnabling() {
@@ -118,6 +125,13 @@ public class TableState {
}
/**
+ * @return True if table is disabling.
+ */
+ public boolean isDisabling() {
+ return isInStates(State.DISABLING);
+ }
+
+ /**
* @return True if {@link State#DISABLED} or {@link State#DISABLED}
*/
public boolean isDisabledOrDisabling() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
index f5caff7..685a73e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
@@ -107,7 +108,7 @@ public class DisableTableProcedure
break;
case DISABLE_TABLE_MARK_REGIONS_OFFLINE:
addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName));
- setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE);
+ setNextState(DisableTableState.DISABLE_TABLE_ADD_REPLICATION_BARRIER);
break;
case DISABLE_TABLE_ADD_REPLICATION_BARRIER:
if (env.getMasterServices().getTableDescriptors().get(tableName)
@@ -119,7 +120,8 @@ public class DisableTableProcedure
.getRegionsOfTable(tableName)) {
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
- mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, maxSequenceId,
+ long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM;
+ mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum,
EnvironmentEdgeManager.currentTime()));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 8bedeff..0b9efce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -18,17 +18,16 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
-import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
@@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +54,9 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
+ // The sleep interval when waiting table to be enabled or disabled.
+ protected static final int SLEEP_INTERVAL_MS = 1000;
+
protected ModifyPeerProcedure() {
}
@@ -126,6 +127,27 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
throw new UnsupportedOperationException();
}
+ // If the table is in enabling state, we need to wait until it is enabled and then reopen all its
+ // regions.
+ private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
+ for (;;) {
+ try {
+ TableState state = tsm.getTableState(tn);
+ if (state.isEnabled()) {
+ return true;
+ }
+ if (!state.isEnabling()) {
+ return false;
+ }
+ Thread.sleep(SLEEP_INTERVAL_MS);
+ } catch (TableStateNotFoundException e) {
+ return false;
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
+ }
+ }
+ }
+
private void reopenRegions(MasterProcedureEnv env) throws IOException {
ReplicationPeerConfig peerConfig = getNewPeerConfig();
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
@@ -142,15 +164,10 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
ReplicationUtils.contains(oldPeerConfig, tn)) {
continue;
}
- try {
- if (!tsm.getTableState(tn).isEnabled()) {
- continue;
- }
- } catch (TableStateNotFoundException e) {
- continue;
+ if (needReopen(tsm, tn)) {
+ addChildProcedure(env.getAssignmentManager().createReopenProcedures(
+ env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn)));
}
- addChildProcedure(env.getAssignmentManager().createReopenProcedures(
- env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn)));
}
}
@@ -183,6 +200,26 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
}
+ // If the table is currently disabling, then we need to wait until it is disabled.We will write
+ // replication barrier for a disabled table. And return whether we need to update the last pushed
+ // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
+ // then we do not need to update last pushed sequence id for this table.
+ private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
+ throws IOException {
+ for (;;) {
+ try {
+ if (!tsm.getTableState(tn).isDisabling()) {
+ return true;
+ }
+ Thread.sleep(SLEEP_INTERVAL_MS);
+ } catch (TableStateNotFoundException e) {
+ return false;
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
+ }
+ }
+ }
+
// Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
// large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
// should not forget to check whether the map is empty at last, if not you should call
@@ -192,26 +229,13 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
TableStateManager tsm = env.getMasterServices().getTableStateManager();
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
Connection conn = env.getMasterServices().getConnection();
- RegionStates regionStates = env.getAssignmentManager().getRegionStates();
- MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- boolean isTableEnabled;
- try {
- isTableEnabled = tsm.getTableState(tableName).isEnabled();
- } catch (TableStateNotFoundException e) {
+ if (!needSetLastPushedSequenceId(tsm, tableName)) {
return;
}
- if (isTableEnabled) {
- for (Pair<String, Long> name2Barrier : MetaTableAccessor
- .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
- addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
- queueStorage);
- }
- } else {
- for (RegionInfo region : regionStates.getRegionsOfTable(tableName, true)) {
- long maxSequenceId =
- WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
- addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage);
- }
+ for (Pair<String, Long> name2Barrier : MetaTableAccessor
+ .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
+ addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
+ queueStorage);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
index 317c120..d3d6cbe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.fs.Path;
@@ -26,6 +28,8 @@ import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
@@ -192,4 +196,74 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
waitUntilReplicationDone(100);
checkOrder(100);
}
+
+ @Test
+ public void testDisablingTable() throws Exception {
+ TableName tableName = createTable();
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ UTIL.getAdmin().disableTable(tableName);
+ rollAllWALs();
+ TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
+ tsm.setTableState(tableName, TableState.State.DISABLING);
+ Thread t = new Thread(() -> {
+ try {
+ addPeer(true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ t.start();
+ Thread.sleep(5000);
+ // we will wait on the disabling table so the thread should still be alive.
+ assertTrue(t.isAlive());
+ tsm.setTableState(tableName, TableState.State.DISABLED);
+ t.join();
+ UTIL.getAdmin().enableTable(tableName);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ waitUntilReplicationDone(100);
+ checkOrder(100);
+ }
+
+ @Test
+ public void testEnablingTable() throws Exception {
+ TableName tableName = createTable();
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+ HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
+ moveRegionAndArchiveOldWals(region, rs);
+ TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
+ tsm.setTableState(tableName, TableState.State.ENABLING);
+ Thread t = new Thread(() -> {
+ try {
+ addPeer(true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ t.start();
+ Thread.sleep(5000);
+ // we will wait on the disabling table so the thread should still be alive.
+ assertTrue(t.isAlive());
+ tsm.setTableState(tableName, TableState.State.ENABLED);
+ t.join();
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ waitUntilReplicationDone(100);
+ checkOrder(100);
+ }
}