You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/08 01:43:30 UTC
[3/5] hbase git commit: HBASE-19128 Purge Distributed Log Replay from
codebase, configurations, text; mark the feature as unsupported, broken.
http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 215d2ed..39063a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -18,9 +18,7 @@
*/
package org.apache.hadoop.hbase.wal;
-import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
@@ -32,7 +30,6 @@ import java.io.InterruptedIOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -50,7 +47,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -67,26 +63,13 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
-import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -95,24 +78,15 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@@ -122,11 +96,12 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* This class is responsible for splitting up a bunch of regionserver commit log
- * files that are no longer being written to, into new files, one per region for
- * region to replay on startup. Delete the old log files when finished.
+ * files that are no longer being written to, into new files, one per region, for
+ * recovering data on startup. Delete the old log files when finished.
*/
@InterfaceAudience.Private
public class WALSplitter {
@@ -142,14 +117,10 @@ public class WALSplitter {
// Major subcomponents of the split process.
// These are separated into inner classes to make testing easier.
- PipelineController controller;
OutputSink outputSink;
- EntryBuffers entryBuffers;
+ private EntryBuffers entryBuffers;
- private Map<TableName, TableState> tableStatesCache =
- new ConcurrentHashMap<>();
private SplitLogWorkerCoordination splitLogWorkerCoordination;
- private Connection connection;
private final WALFactory walFactory;
private MonitoredTask status;
@@ -157,31 +128,19 @@ public class WALSplitter {
// For checking the latest flushed sequence id
protected final LastSequenceId sequenceIdChecker;
- protected boolean distributedLogReplay;
-
// Map encodedRegionName -> lastFlushedSequenceId
protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
// Map encodedRegionName -> maxSeqIdInStores
protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
- // Failed region server that the wal file being split belongs to
- protected String failedServerName = "";
-
- // Number of writer threads
- private final int numWriterThreads;
-
- // Min batch size when replay WAL edits
- private final int minBatchSize;
-
// the file being split currently
private FileStatus fileBeingSplit;
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker,
- SplitLogWorkerCoordination splitLogWorkerCoordination, Connection connection,
- RecoveryMode mode) {
+ SplitLogWorkerCoordination splitLogWorkerCoordination) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
@@ -190,31 +149,15 @@ public class WALSplitter {
this.fs = fs;
this.sequenceIdChecker = idChecker;
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
- this.connection = connection;
this.walFactory = factory;
- this.controller = new PipelineController();
+ PipelineController controller = new PipelineController();
entryBuffers = new EntryBuffers(controller,
- this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
- 128*1024*1024));
-
- // a larger minBatchSize may slow down recovery because replay writer has to wait for
- // enough edits before replaying them
- this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
- this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
-
- this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
- if (this.splitLogWorkerCoordination != null && this.distributedLogReplay) {
- outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
- } else {
- if (this.distributedLogReplay) {
- LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
- }
- this.distributedLogReplay = false;
- outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
- }
+ this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024));
+ int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
+ outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
}
/**
@@ -227,10 +170,10 @@ public class WALSplitter {
*/
public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
- SplitLogWorkerCoordination splitLogWorkerCoordination, Connection connection,
- RecoveryMode mode, final WALFactory factory) throws IOException {
+ SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
+ throws IOException {
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker,
- splitLogWorkerCoordination, connection, mode);
+ splitLogWorkerCoordination);
return s.splitLogFile(logfile, reporter);
}
@@ -246,8 +189,7 @@ public class WALSplitter {
List<Path> splits = new ArrayList<>();
if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) {
- WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null, null,
- RecoveryMode.LOG_SPLITTING);
+ WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
@@ -281,22 +223,20 @@ public class WALSplitter {
int editsCount = 0;
int editsSkipped = 0;
- status =
- TaskMonitor.get().createStatus(
+ status = TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() + "into a temporary staging area.");
- Reader in = null;
+ Reader logFileReader = null;
this.fileBeingSplit = logfile;
try {
long logLength = logfile.getLen();
- LOG.info("Splitting WAL=" + logPath + ", length=" + logLength +
- ", distributedLogReplay=" + this.distributedLogReplay);
+ LOG.info("Splitting WAL=" + logPath + ", length=" + logLength);
status.setStatus("Opening log file");
if (reporter != null && !reporter.progress()) {
progress_failed = true;
return false;
}
- in = getReader(logfile, skipErrors, reporter);
- if (in == null) {
+ logFileReader = getReader(logfile, skipErrors, reporter);
+ if (logFileReader == null) {
LOG.warn("Nothing to split in WAL=" + logPath);
return true;
}
@@ -307,26 +247,12 @@ public class WALSplitter {
outputSinkStarted = true;
Entry entry;
Long lastFlushedSequenceId = -1L;
- // THIS IS BROKEN!!!! GETTING SERVERNAME FROM PATH IS NOT GOING TO WORK IF LAYOUT CHANGES!!!
- // TODO: Fix.
- ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logPath);
- failedServerName = (serverName == null) ? "" : serverName.getServerName();
- while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
+ while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName();
String encodedRegionNameAsStr = Bytes.toString(region);
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
if (lastFlushedSequenceId == null) {
- if (this.distributedLogReplay) {
- RegionStoreSequenceIds ids = splitLogWorkerCoordination.getRegionFlushedSequenceId(
- failedServerName, encodedRegionNameAsStr);
- if (ids != null) {
- lastFlushedSequenceId = ids.getLastFlushedSequenceId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
- TextFormat.shortDebugString(ids));
- }
- }
- } else if (sequenceIdChecker != null) {
+ if (sequenceIdChecker != null) {
RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
@@ -390,8 +316,8 @@ public class WALSplitter {
} finally {
LOG.debug("Finishing writing output logs and closing down.");
try {
- if (null != in) {
- in.close();
+ if (null != logFileReader) {
+ logFileReader.close();
}
} catch (IOException exception) {
LOG.warn("Could not close WAL reader: " + exception.getMessage());
@@ -954,7 +880,7 @@ public class WALSplitter {
}
/**
- * @return RegionEntryBuffer a buffer of edits to be written or replayed.
+ * @return RegionEntryBuffer a buffer of edits to be written.
*/
synchronized RegionEntryBuffer getChunkToWrite() {
long biggestSize = 0;
@@ -1127,8 +1053,8 @@ public class WALSplitter {
}
/**
- * The following class is an abstraction class to provide a common interface to support both
- * existing recovered edits file sink and region server WAL edits replay sink
+ * The following class is an abstraction class to provide a common interface to support
+ * different ways of consuming recovered edits.
*/
public static abstract class OutputSink {
@@ -1195,10 +1121,6 @@ public class WALSplitter {
}
}
- Long getRegionMaximumEditLogSeqNum(byte[] region) {
- return regionMaximumEditLogSeqNum.get(region);
- }
-
/**
* @return the number of currently opened writers
*/
@@ -1692,583 +1614,6 @@ public class WALSplitter {
}
}
- /**
- * Class that manages to replay edits from WAL files directly to assigned fail over region servers
- */
- class LogReplayOutputSink extends OutputSink {
- private static final double BUFFER_THRESHOLD = 0.35;
- private static final String KEY_DELIMITER = "#";
-
- private final long waitRegionOnlineTimeOut;
- private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
- private final Map<String, RegionServerWriter> writers = new ConcurrentHashMap<>();
- // online encoded region name -> region location map
- private final Map<String, HRegionLocation> onlineRegions = new ConcurrentHashMap<>();
-
- private final Map<TableName, ClusterConnection> tableNameToHConnectionMap = Collections
- .synchronizedMap(new TreeMap<TableName, ClusterConnection>());
- /**
- * Map key -> value layout
- * {@literal <servername>:<table name> -> Queue<Row>}
- */
- private final Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
- new ConcurrentHashMap<>();
- private final List<Throwable> thrown = new ArrayList<>();
-
- // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
- // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
- // assigned and online before it can replay wal edits while regions of disabling/disabled table
- // won't be assigned by AM. We can retire this code after HBASE-8234.
- private final LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
- private boolean hasEditsInDisablingOrDisabledTables = false;
-
- public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
- int numWriters) {
- super(controller, entryBuffers, numWriters);
- this.waitRegionOnlineTimeOut =
- conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
- ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
- this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
- entryBuffers, numWriters);
- this.logRecoveredEditsOutputSink.setReporter(reporter);
- }
-
- @Override
- public void append(RegionEntryBuffer buffer) throws IOException {
- List<Entry> entries = buffer.entryBuffer;
- if (entries.isEmpty()) {
- LOG.warn("got an empty buffer, skipping");
- return;
- }
-
- // check if current region in a disabling or disabled table
- if (isTableDisabledOrDisabling(buffer.tableName)) {
- // need fall back to old way
- logRecoveredEditsOutputSink.append(buffer);
- hasEditsInDisablingOrDisabledTables = true;
- // store regions we have recovered so far
- addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
- return;
- }
-
- // group entries by region servers
- groupEditsByServer(entries);
-
- // process workitems
- String maxLocKey = null;
- int maxSize = 0;
- List<Pair<HRegionLocation, Entry>> maxQueue = null;
- synchronized (this.serverToBufferQueueMap) {
- for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry :
- this.serverToBufferQueueMap.entrySet()) {
- List<Pair<HRegionLocation, Entry>> curQueue = entry.getValue();
- if (curQueue.size() > maxSize) {
- maxSize = curQueue.size();
- maxQueue = curQueue;
- maxLocKey = entry.getKey();
- }
- }
- if (maxSize < minBatchSize
- && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
- // buffer more to process
- return;
- } else if (maxSize > 0) {
- this.serverToBufferQueueMap.remove(maxLocKey);
- }
- }
-
- if (maxSize > 0) {
- processWorkItems(maxLocKey, maxQueue);
- }
- }
-
- private void addToRecoveredRegions(String encodedRegionName) {
- if (!recoveredRegions.contains(encodedRegionName)) {
- recoveredRegions.add(encodedRegionName);
- }
- }
-
- /**
- * Helper function to group WALEntries to individual region servers
- * @throws IOException
- */
- private void groupEditsByServer(List<Entry> entries) throws IOException {
- Set<TableName> nonExistentTables = null;
- Long cachedLastFlushedSequenceId = -1l;
- for (Entry entry : entries) {
- WALEdit edit = entry.getEdit();
- TableName table = entry.getKey().getTablename();
- // clear scopes which isn't needed for recovery
- entry.getKey().serializeReplicationScope(false);
- String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
- // skip edits of non-existent tables
- if (nonExistentTables != null && nonExistentTables.contains(table)) {
- this.skippedEdits.incrementAndGet();
- continue;
- }
-
- Map<byte[], Long> maxStoreSequenceIds = null;
- boolean needSkip = false;
- HRegionLocation loc = null;
- String locKey = null;
- List<Cell> cells = edit.getCells();
- List<Cell> skippedCells = new ArrayList<>();
- ClusterConnection cconn = this.getConnectionByTableName(table);
-
- for (Cell cell : cells) {
- byte[] row = CellUtil.cloneRow(cell);
- byte[] family = CellUtil.cloneFamily(cell);
- boolean isCompactionEntry = false;
- if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
- CompactionDescriptor compaction = WALEdit.getCompaction(cell);
- if (compaction != null && compaction.hasRegionName()) {
- try {
- byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
- .toByteArray());
- row = regionName[1]; // startKey of the region
- family = compaction.getFamilyName().toByteArray();
- isCompactionEntry = true;
- } catch (Exception ex) {
- LOG.warn("Unexpected exception received, ignoring " + ex);
- skippedCells.add(cell);
- continue;
- }
- } else {
- skippedCells.add(cell);
- continue;
- }
- }
-
- try {
- loc =
- locateRegionAndRefreshLastFlushedSequenceId(cconn, table, row,
- encodeRegionNameStr);
- // skip replaying the compaction if the region is gone
- if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
- loc.getRegionInfo().getEncodedName())) {
- LOG.info("Not replaying a compaction marker for an older region: "
- + encodeRegionNameStr);
- needSkip = true;
- }
- } catch (TableNotFoundException ex) {
- // table has been deleted so skip edits of the table
- LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
- + encodeRegionNameStr);
- lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
- if (nonExistentTables == null) {
- nonExistentTables = new TreeSet<>();
- }
- nonExistentTables.add(table);
- this.skippedEdits.incrementAndGet();
- needSkip = true;
- break;
- }
-
- cachedLastFlushedSequenceId =
- lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
- if (cachedLastFlushedSequenceId != null
- && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
- // skip the whole WAL entry
- this.skippedEdits.incrementAndGet();
- needSkip = true;
- break;
- } else {
- if (maxStoreSequenceIds == null) {
- maxStoreSequenceIds =
- regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
- }
- if (maxStoreSequenceIds != null) {
- Long maxStoreSeqId = maxStoreSequenceIds.get(family);
- if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getSequenceId()) {
- // skip current kv if column family doesn't exist anymore or already flushed
- skippedCells.add(cell);
- continue;
- }
- }
- }
- }
-
- // skip the edit
- if (loc == null || needSkip) continue;
-
- if (!skippedCells.isEmpty()) {
- cells.removeAll(skippedCells);
- }
-
- synchronized (serverToBufferQueueMap) {
- locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
- List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
- if (queue == null) {
- queue =
- Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
- serverToBufferQueueMap.put(locKey, queue);
- }
- queue.add(new Pair<>(loc, entry));
- }
- // store regions we have recovered so far
- addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
- }
- }
-
- /**
- * Locate destination region based on table name & row. This function also makes sure the
- * destination region is online for replay.
- * @throws IOException
- */
- private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(ClusterConnection cconn,
- TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
- // fetch location from cache
- HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
- if(loc != null) return loc;
- // fetch location from hbase:meta directly without using cache to avoid hit old dead server
- loc = cconn.getRegionLocation(table, row, true);
- if (loc == null) {
- throw new IOException("Can't locate location for row:" + Bytes.toString(row)
- + " of table:" + table);
- }
- // check if current row moves to a different region due to region merge/split
- if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
- // originalEncodedRegionName should have already flushed
- lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
- HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
- if (tmpLoc != null) return tmpLoc;
- }
-
- Long lastFlushedSequenceId = -1L;
- AtomicBoolean isRecovering = new AtomicBoolean(true);
- loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
- if (!isRecovering.get()) {
- // region isn't in recovering at all because WAL file may contain a region that has
- // been moved to somewhere before hosting RS fails
- lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
- LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
- + " because it's not in recovering.");
- } else {
- Long cachedLastFlushedSequenceId =
- lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
-
- // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
- // update the value for the region
- RegionStoreSequenceIds ids =
- splitLogWorkerCoordination.getRegionFlushedSequenceId(failedServerName,
- loc.getRegionInfo().getEncodedName());
- if (ids != null) {
- lastFlushedSequenceId = ids.getLastFlushedSequenceId();
- Map<byte[], Long> storeIds = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
- for (StoreSequenceId id : maxSeqIdInStores) {
- storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
- }
- regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
- }
-
- if (cachedLastFlushedSequenceId == null
- || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
- lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
- }
- }
-
- onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
- return loc;
- }
-
- private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
- throws IOException {
- RegionServerWriter rsw = null;
-
- long startTime = System.nanoTime();
- try {
- rsw = getRegionServerWriter(key);
- rsw.sink.replayEntries(actions);
-
- // Pass along summary statistics
- rsw.incrementEdits(actions.size());
- rsw.incrementNanoTime(System.nanoTime() - startTime);
- } catch (IOException e) {
- e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
- LOG.fatal(" Got while writing log entry to log", e);
- throw e;
- }
- }
-
- /**
- * Wait until region is online on the destination region server
- * @param loc
- * @param row
- * @param timeout How long to wait
- * @param isRecovering Recovering state of the region interested on destination region server.
- * @return True when region is online on the destination region server
- * @throws InterruptedException
- */
- private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
- final long timeout, AtomicBoolean isRecovering)
- throws IOException {
- final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
- final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
- HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
- boolean reloadLocation = false;
- TableName tableName = loc.getRegionInfo().getTable();
- int tries = 0;
- Throwable cause = null;
- while (endTime > EnvironmentEdgeManager.currentTime()) {
- try {
- // Try and get regioninfo from the hosting server.
- ClusterConnection cconn = getConnectionByTableName(tableName);
- if(reloadLocation) {
- loc = cconn.getRegionLocation(tableName, row, true);
- }
- BlockingInterface remoteSvr = cconn.getAdmin(loc.getServerName());
- HRegionInfo region = loc.getRegionInfo();
- try {
- GetRegionInfoRequest request =
- RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
- GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
- if (HRegionInfo.convert(response.getRegionInfo()) != null) {
- isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
- return loc;
- }
- } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException e) {
- throw ProtobufUtil.handleRemoteException(e);
- }
- } catch (IOException e) {
- cause = e.getCause();
- if(!(cause instanceof RegionOpeningException)) {
- reloadLocation = true;
- }
- }
- long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
- try {
- Thread.sleep(expectedSleep);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted when waiting region " +
- loc.getRegionInfo().getEncodedName() + " online.", e);
- }
- tries++;
- }
-
- throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
- " online for " + timeout + " milliseconds.", cause);
- }
-
- @Override
- public boolean flush() throws IOException {
- String curLoc = null;
- int curSize = 0;
- List<Pair<HRegionLocation, Entry>> curQueue = null;
- synchronized (this.serverToBufferQueueMap) {
- for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry :
- this.serverToBufferQueueMap.entrySet()) {
- curQueue = entry.getValue();
- if (!curQueue.isEmpty()) {
- curSize = curQueue.size();
- curLoc = entry.getKey();
- break;
- }
- }
- if (curSize > 0) {
- this.serverToBufferQueueMap.remove(curLoc);
- }
- }
-
- if (curSize > 0) {
- this.processWorkItems(curLoc, curQueue);
- // We should already have control of the monitor; ensure this is the case.
- synchronized(controller.dataAvailable) {
- controller.dataAvailable.notifyAll();
- }
- return true;
- }
- return false;
- }
-
- @Override
- public boolean keepRegionEvent(Entry entry) {
- return true;
- }
-
- void addWriterError(Throwable t) {
- thrown.add(t);
- }
-
- @Override
- public List<Path> finishWritingAndClose() throws IOException {
- try {
- if (!finishWriting(false)) {
- return null;
- }
- if (hasEditsInDisablingOrDisabledTables) {
- splits = logRecoveredEditsOutputSink.finishWritingAndClose();
- } else {
- splits = new ArrayList<>();
- }
- // returns an empty array in order to keep interface same as old way
- return splits;
- } finally {
- List<IOException> thrown = closeRegionServerWriters();
- if (thrown != null && !thrown.isEmpty()) {
- throw MultipleIOException.createIOException(thrown);
- }
- }
- }
-
- @Override
- int getNumOpenWriters() {
- return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
- }
-
- private List<IOException> closeRegionServerWriters() throws IOException {
- List<IOException> result = null;
- if (!writersClosed) {
- result = Lists.newArrayList();
- try {
- for (WriterThread t : writerThreads) {
- while (t.isAlive()) {
- t.shouldStop = true;
- t.interrupt();
- try {
- t.join(10);
- } catch (InterruptedException e) {
- IOException iie = new InterruptedIOException();
- iie.initCause(e);
- throw iie;
- }
- }
- }
- } finally {
- synchronized (writers) {
- for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
- RegionServerWriter tmpW = entry.getValue();
- try {
- tmpW.close();
- } catch (IOException ioe) {
- LOG.error("Couldn't close writer for region server:" + entry.getKey(), ioe);
- result.add(ioe);
- }
- }
- }
-
- // close connections
- synchronized (this.tableNameToHConnectionMap) {
- for (Map.Entry<TableName, ClusterConnection> entry :
- this.tableNameToHConnectionMap.entrySet()) {
- ClusterConnection cconn = entry.getValue();
- try {
- cconn.clearRegionCache();
- cconn.close();
- } catch (IOException ioe) {
- result.add(ioe);
- }
- }
- }
- writersClosed = true;
- }
- }
- return result;
- }
-
- @Override
- public Map<byte[], Long> getOutputCounts() {
- TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- synchronized (writers) {
- for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
- ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
- }
- }
- return ret;
- }
-
- @Override
- public int getNumberOfRecoveredRegions() {
- return this.recoveredRegions.size();
- }
-
- private boolean isTableDisabledOrDisabling(TableName tableName) {
- if (connection == null)
- return false; // we can't get state without CoordinatedStateManager
- if (tableName.isSystemTable())
- return false; // assume that system tables never can be disabled
- TableState tableState = tableStatesCache.get(tableName);
- if (tableState == null) {
- try {
- tableState = MetaTableAccessor.getTableState(connection, tableName);
- if (tableState != null)
- tableStatesCache.put(tableName, tableState);
- } catch (IOException e) {
- LOG.warn("State is not accessible for table " + tableName, e);
- }
- }
- return tableState != null && tableState
- .inStates(TableState.State.DISABLED, TableState.State.DISABLING);
- }
-
- /**
- * Get a writer and path for a log starting at the given entry. This function is threadsafe so
- * long as multiple threads are always acting on different regions.
- * @return null if this region shouldn't output any logs
- */
- private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
- RegionServerWriter ret = writers.get(loc);
- if (ret != null) {
- return ret;
- }
-
- TableName tableName = getTableFromLocationStr(loc);
- if(tableName == null){
- throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
- }
-
- ClusterConnection hconn = getConnectionByTableName(tableName);
- synchronized (writers) {
- ret = writers.get(loc);
- if (ret == null) {
- ret = new RegionServerWriter(conf, tableName, hconn);
- writers.put(loc, ret);
- }
- }
- return ret;
- }
-
- private ClusterConnection getConnectionByTableName(final TableName tableName) throws IOException {
- ClusterConnection cconn = this.tableNameToHConnectionMap.get(tableName);
- if (cconn == null) {
- synchronized (this.tableNameToHConnectionMap) {
- cconn = this.tableNameToHConnectionMap.get(tableName);
- if (cconn == null) {
- cconn = (ClusterConnection) ConnectionFactory.createConnection(conf);
- this.tableNameToHConnectionMap.put(tableName, cconn);
- }
- }
- }
- return cconn;
- }
- private TableName getTableFromLocationStr(String loc) {
- /**
- * location key is in format {@literal <server name:port>#<table name>}
- */
- String[] splits = loc.split(KEY_DELIMITER);
- if (splits.length != 2) {
- return null;
- }
- return TableName.valueOf(splits[1]);
- }
- }
-
- /**
- * Private data structure that wraps a receiving RS and collecting statistics about the data
- * written to this newly assigned RS.
- */
- private final static class RegionServerWriter extends SinkWriter {
- final WALEditsReplaySink sink;
-
- RegionServerWriter(final Configuration conf, final TableName tableName, final ClusterConnection conn)
- throws IOException {
- this.sink = new WALEditsReplaySink(conf, tableName, conn);
- }
-
- void close() throws IOException {
- }
- }
-
static class CorruptedLogFileException extends Exception {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
deleted file mode 100644
index 16485ee..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Watcher used to be notified of the recovering region coming out of recovering state
- */
-@InterfaceAudience.Private
-public class RecoveringRegionWatcher extends ZooKeeperListener {
- private static final Log LOG = LogFactory.getLog(RecoveringRegionWatcher.class);
-
- private HRegionServer server;
-
- /**
- * Construct a ZooKeeper event listener.
- */
- public RecoveringRegionWatcher(ZooKeeperWatcher watcher, HRegionServer server) {
- super(watcher);
- watcher.registerListener(this);
- this.server = server;
- }
-
- /**
- * Called when a node has been deleted
- * @param path full path of the deleted node
- */
- @Override
- public void nodeDeleted(String path) {
- if (this.server.isStopped() || this.server.isStopping()) {
- return;
- }
-
- String parentPath = path.substring(0, path.lastIndexOf('/'));
- if (!this.watcher.znodePaths.recoveringRegionsZNode.equalsIgnoreCase(parentPath)) {
- return;
- }
-
- String regionName = path.substring(parentPath.length() + 1);
-
- server.getExecutorService().submit(new FinishRegionRecoveringHandler(server, regionName, path));
- }
-
- @Override
- public void nodeDataChanged(String path) {
- registerWatcher(path);
- }
-
- @Override
- public void nodeChildrenChanged(String path) {
- registerWatcher(path);
- }
-
- /**
- * Reinstall watcher because watcher only fire once though we're only interested in nodeDeleted
- * event we need to register the watcher in case other event happens
- */
- private void registerWatcher(String path) {
- String parentPath = path.substring(0, path.lastIndexOf('/'));
- if (!this.watcher.znodePaths.recoveringRegionsZNode.equalsIgnoreCase(parentPath)) {
- return;
- }
-
- try {
- ZKUtil.getDataAndWatch(watcher, path);
- } catch (KeeperException e) {
- LOG.warn("Can't register watcher on znode " + path, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
index 05cd8a2..30e988f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
@@ -18,11 +18,9 @@
package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,12 +28,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
-import org.apache.zookeeper.KeeperException;
/**
- * Common methods and attributes used by {@link org.apache.hadoop.hbase.master.SplitLogManager}
+ * Common methods and attributes used by {@link org.apache.hadoop.hbase.master.SplitLogManager}
* and {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
* running distributed splitting of WAL logs.
*/
@@ -104,20 +99,10 @@ public class ZKSplitLog {
return true;
}
- public static boolean isTaskPath(ZooKeeperWatcher zkw, String path) {
- String dirname = path.substring(0, path.lastIndexOf('/'));
- return dirname.equals(zkw.znodePaths.splitLogZNode);
- }
-
public static Path getSplitLogDir(Path rootdir, String tmpname) {
return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
}
-
- public static String getSplitLogDirTmpComponent(final String worker, String file) {
- return worker + "_" + ZKSplitLog.encode(file);
- }
-
public static void markCorrupted(Path rootdir, String logFileName,
FileSystem fs) {
Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
@@ -136,101 +121,4 @@ public class ZKSplitLog {
isCorrupt = fs.exists(file);
return isCorrupt;
}
-
- /*
- * Following methods come from SplitLogManager
- */
-
- /**
- * check if /hbase/recovering-regions/<current region encoded name>
- * exists. Returns true if exists and set watcher as well.
- * @param zkw
- * @param regionEncodedName region encode name
- * @return true when /hbase/recovering-regions/<current region encoded name> exists
- * @throws KeeperException
- */
- public static boolean
- isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
- throws KeeperException {
- boolean result = false;
- String nodePath = ZKUtil.joinZNode(zkw.znodePaths.recoveringRegionsZNode, regionEncodedName);
-
- byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
- if (node != null) {
- result = true;
- }
- return result;
- }
-
- /**
- * @param bytes - Content of a failed region server or recovering region znode.
- * @return long - The last flushed sequence Id for the region server
- */
- public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
- long lastRecordedFlushedSequenceId = -1l;
- try {
- lastRecordedFlushedSequenceId = ZKUtil.parseWALPositionFrom(bytes);
- } catch (DeserializationException e) {
- lastRecordedFlushedSequenceId = -1l;
- LOG.warn("Can't parse last flushed sequence Id", e);
- }
- return lastRecordedFlushedSequenceId;
- }
-
- public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
- try {
- if (regions == null) {
- // remove all children under /home/recovering-regions
- LOG.debug("Garbage collecting all recovering region znodes");
- ZKUtil.deleteChildrenRecursively(watcher, watcher.znodePaths.recoveringRegionsZNode);
- } else {
- for (String curRegion : regions) {
- String nodePath = ZKUtil.joinZNode(watcher.znodePaths.recoveringRegionsZNode, curRegion);
- ZKUtil.deleteNodeRecursively(watcher, nodePath);
- }
- }
- } catch (KeeperException e) {
- LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
- }
- }
-
- /**
- * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
- * @param zkw
- * @param serverName
- * @param encodedRegionName
- * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName</code>
- * @throws IOException
- */
-
- public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
- String serverName, String encodedRegionName) throws IOException {
- // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
- // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
- // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
- // sequence Id name space (sequence Id only valid for a particular RS instance), changes
- // when different newly assigned RS flushes the region.
- // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
- // last flushed sequence Id for each failed RS instance.
- RegionStoreSequenceIds result = null;
- String nodePath = ZKUtil.joinZNode(zkw.znodePaths.recoveringRegionsZNode, encodedRegionName);
- nodePath = ZKUtil.joinZNode(nodePath, serverName);
- try {
- byte[] data;
- try {
- data = ZKUtil.getData(zkw, nodePath);
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- if (data != null) {
- result = ZKUtil.parseRegionStoreSequenceIds(data);
- }
- } catch (KeeperException e) {
- throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
- + serverName + "; region=" + encodedRegionName, e);
- } catch (DeserializationException e) {
- LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
- }
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 58a0055..cac6fd6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -265,12 +265,6 @@ public class MockRegionServerServices implements RegionServerServices {
}
@Override
- public Map<String, HRegion> getRecoveringRegions() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public ServerNonceManager getNonceManager() {
// TODO Auto-generated method stub
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 2ef200f..643940c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -323,7 +323,7 @@ public class TestIOFencing {
@Override
public boolean evaluate() throws Exception {
Region newRegion = newServer.getOnlineRegion(REGION_NAME);
- return newRegion != null && !newRegion.isRecovering();
+ return newRegion != null;
}
});
http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
index e3c1df3..f60c1e5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
@@ -36,20 +36,16 @@ import java.util.Set;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.DataInputBuffer;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -121,15 +117,6 @@ public class TestSerialization {
}
- @Test
- public void testSplitLogTask() throws DeserializationException {
- SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"),
- RecoveryMode.LOG_REPLAY);
- byte [] bytes = slt.toByteArray();
- SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes);
- assertTrue(slt.equals(sltDeserialized));
- }
-
@Test public void testCompareFilter() throws Exception {
Filter f = new RowFilter(CompareOperator.EQUAL,
new BinaryComparator(Bytes.toBytes("testRowOne-2")));
http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 68770b9..a34b651 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -264,7 +264,7 @@ public class TestReplicasClient {
} catch (Exception e){}
// first version is '0'
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
- getRS().getServerName(), hri, null, null);
+ getRS().getServerName(), hri, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
Assert.assertEquals(responseOpen.getOpeningStateCount(), 1);
Assert.assertEquals(responseOpen.getOpeningState(0),
http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 119c225..964bb26 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -557,11 +557,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
}
@Override
- public Map<String, HRegion> getRecoveringRegions() {
- return null;
- }
-
- @Override
public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
UpdateFavoredNodesRequest request) throws ServiceException {
return null;