You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2014/12/01 20:17:35 UTC

hbase git commit: Revert "Remove Replay Tag"

Repository: hbase
Updated Branches:
  refs/heads/master f723ffde7 -> 073345047


Revert "Remove Replay Tag"

This reverts commit f723ffde7ffa93ddd4c40456c917911817f314b1.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/07334504
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/07334504
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/07334504

Branch: refs/heads/master
Commit: 0733450473c422c3857f21c57651c9124bd8e25b
Parents: f723ffd
Author: Jeffrey Zhong <je...@apache.org>
Authored: Mon Dec 1 11:17:24 2014 -0800
Committer: Jeffrey Zhong <je...@apache.org>
Committed: Mon Dec 1 11:17:24 2014 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/CellComparator.java | 29 +++++++++++++++
 .../java/org/apache/hadoop/hbase/KeyValue.java  |  9 +++--
 .../java/org/apache/hadoop/hbase/TagType.java   |  2 +-
 .../ZKSplitLogManagerCoordination.java          | 10 +++---
 .../hadoop/hbase/regionserver/HRegion.java      | 12 ++-----
 .../hbase/regionserver/RSRpcServices.java       |  4 ++-
 .../compactions/DefaultCompactor.java           |  3 +-
 .../compactions/StripeCompactor.java            |  4 +--
 .../apache/hadoop/hbase/wal/WALSplitter.java    | 37 ++++++++++++++++++--
 .../master/TestDistributedLogSplitting.java     |  4 ++-
 10 files changed, 87 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
index d760aa2..9b7107b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
@@ -68,6 +68,19 @@ public class CellComparator implements Comparator<Cell>, Serializable {
 
     if (!ignoreSequenceid) {
       // Negate following comparisons so later edits show up first
+
+      // compare log replay tag value if there is any
+      // when either keyvalue tagged with log replay sequence number, we need to compare them:
+      // 1) when both keyvalues have the tag, then use the tag values for comparison
+      // 2) when one has and the other doesn't have, the one without the log
+      // replay tag wins because
+      // it means the edit isn't from recovery but new one coming from clients during recovery
+      // 3) when both doesn't have, then skip to the next mvcc comparison
+      long leftChangeSeqNum = getReplaySeqNum(a);
+      long RightChangeSeqNum = getReplaySeqNum(b);
+      if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) {
+        return Longs.compare(RightChangeSeqNum, leftChangeSeqNum);
+      }
       // mvccVersion: later sorts first
       return Longs.compare(b.getMvccVersion(), a.getMvccVersion());
     } else {
@@ -75,6 +88,22 @@ public class CellComparator implements Comparator<Cell>, Serializable {
     }
   }
 
+  /**
+   * Return replay log sequence number for the cell
+   *
+   * @param c
+   * @return Long.MAX_VALUE if there is no LOG_REPLAY_TAG
+   */
+  private static long getReplaySeqNum(final Cell c) {
+    Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength(),
+        TagType.LOG_REPLAY_TAG_TYPE);
+
+    if (tag != null) {
+      return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
+    }
+    return Long.MAX_VALUE;
+  }
+
   public static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
     return findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
         - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()

http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 8566a88..516fd81 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -746,7 +746,6 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
         c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
         c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
         c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
-    this.seqId = c.getSequenceId();
   }
 
   /**
@@ -962,8 +961,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
     checkForTagsLength(tagsLength);
     // Allocate right-sized byte array.
     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
-    byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
-      tagsLength)];
+    byte [] bytes =
+        new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, tagsLength)];
     // Write key, value and key row length.
     int pos = 0;
     pos = Bytes.putInt(bytes, pos, keyLength);
@@ -2493,8 +2492,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
      * Compare two keys assuming that the first n bytes are the same.
      * @param commonPrefix How many bytes are the same.
      */
-    int compareIgnoringPrefix(int commonPrefix, byte[] left, int loffset, int llength,
-        byte[] right, int roffset, int rlength
+    int compareIgnoringPrefix(
+      int commonPrefix, byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength
     );
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index f088dcf..b113516 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -26,7 +26,7 @@ public final class TagType {
   // Please declare new Tag Types here to avoid step on pre-existing tag types.
   public static final byte ACL_TAG_TYPE = (byte) 1;
   public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
-  // public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; // deprecated
+  public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
   public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
   // String based tag type used in replication
   public static final byte STRING_VIS_TAG_TYPE = (byte) 7;

http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
index 694ccff..4f511f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -773,8 +773,8 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
   public void setRecoveryMode(boolean isForInitialization) throws IOException {
     synchronized(this) {
       if (this.isDrainingDone) {
-        // when there is no outstanding splitlogtask after master start up, we already have up to 
-        // date recovery mode
+        // when there is no outstanding splitlogtask after master start up, we already have up to date
+        // recovery mode
         return;
       }
     }
@@ -866,10 +866,12 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
     boolean dlr =
         conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
           HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+    int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Distributed log replay=" + dlr);
+      LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
     }
-    return dlr;
+    // For distributed log replay, hfile version must be 3 at least; we need tag support.
+    return dlr && (version >= 3);
   }
 
   private boolean resubmit(ServerName serverName, String path, int version) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d5c07f0..4a4d004 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2684,7 +2684,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           continue;
         }
         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
-        addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
+        addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
       }
 
       // ------------------------------------
@@ -3189,13 +3189,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
    *        If null, then this method internally creates a mvcc transaction.
    * @param output newly added KVs into memstore
-   * @param isInReplay true when adding replayed KVs into memstore
    * @return the additional memory usage of the memstore caused by the
    * new entries.
    * @throws IOException
    */
   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
-    long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
+    long mvccNum, List<Cell> memstoreCells) throws IOException {
     long size = 0;
 
     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
@@ -3210,10 +3209,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         Pair<Long, Cell> ret = store.add(cell);
         size += ret.getFirst();
         memstoreCells.add(ret.getSecond());
-        if(isInReplay) {
-          // set memstore newly added cells with replay mvcc number
-          CellUtil.setSequenceId(ret.getSecond(), mvccNum);
-        }
       }
     }
 
@@ -3412,8 +3407,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       }
 
       try {
-        // replay the edits. Replay can return -1 if everything is skipped, only update
-        // if seqId is greater
+        // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
       } catch (IOException e) {
         boolean skipErrors = conf.getBoolean(

http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 8821803..dd895e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1430,6 +1430,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             ? region.getCoprocessorHost()
             : null; // do not invoke coprocessors if this is a secondary region replica
       List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
+      // when tag is enabled, we need tag replay edits with log sequence number
+      boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
 
       // Skip adding the edits to WAL if this is a secondary region replica
       boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
@@ -1450,7 +1452,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
           new Pair<WALKey, WALEdit>();
         List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
-          cells, walEntry, durability);
+          cells, walEntry, needAddReplayTag, durability);
         if (coprocessorHost != null) {
           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
           // KeyValue.

http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index df76073..cc03e09 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -92,9 +92,8 @@ public class DefaultCompactor extends Compactor {
           smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
           cleanSeqId = true;
         }
-        
         writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
-            true, fd.maxTagsLength > 0);
+            fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
         boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
         if (!finished) {
           writer.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 40b4af0..a74bd3d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -112,13 +112,13 @@ public class StripeCompactor extends Compactor {
         smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
         cleanSeqId = true;
       }
-
+      final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
       final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
       StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
         @Override
         public Writer createWriter() throws IOException {
           return store.createWriterInTmp(
-              fd.maxKeyCount, compression, true, true, fd.maxTagsLength > 0);
+              fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
         }
       };
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 3f381cc..ef7b6ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -2135,6 +2135,34 @@ public class WALSplitter {
     public final long nonce;
   }
 
+ /**
+  * Tag original sequence number for each edit to be replayed
+  * @param seqId
+  * @param cell
+  */
+  private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
+    // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
+    boolean needAddRecoveryTag = true;
+    if (cell.getTagsLength() > 0) {
+      Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
+        TagType.LOG_REPLAY_TAG_TYPE);
+      if (tmpTag != null) {
+        // found an existing log replay tag so reuse it
+        needAddRecoveryTag = false;
+      }
+    }
+    if (needAddRecoveryTag) {
+      List<Tag> newTags = new ArrayList<Tag>();
+      Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
+      newTags.add(replayTag);
+      if (cell.getTagsLength() > 0) {
+        newTags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
+      }
+      return new TagRewriteCell(cell, Tag.fromList(newTags));
+    }
+    return cell;
+  }
+
   /**
    * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &
    * WALEdit from the passed in WALEntry
@@ -2142,11 +2170,12 @@ public class WALSplitter {
    * @param cells
    * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
    *          extracted from the passed in WALEntry.
+   * @param addLogReplayTag
    * @return list of Pair<MutationType, Mutation> to be replayed
    * @throws IOException
    */
   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
-      Pair<WALKey, WALEdit> logEntry, Durability durability)
+      Pair<WALKey, WALEdit> logEntry, boolean addLogReplayTag, Durability durability)
           throws IOException {
 
     if (entry == null) {
@@ -2194,7 +2223,11 @@ public class WALSplitter {
       if (CellUtil.isDelete(cell)) {
         ((Delete) m).addDeleteMarker(cell);
       } else {
-        ((Put) m).add(cell);
+        Cell tmpNewCell = cell;
+        if (addLogReplayTag) {
+          tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
+        }
+        ((Put) m).add(tmpNewCell);
       }
       m.setDurability(durability);
       previousCell = cell;

http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index f37c1eb..19050d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -1188,6 +1188,7 @@ public class TestDistributedLogSplitting {
     LOG.info("testSameVersionUpdatesRecovery");
     conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    conf.setInt("hfile.format.version", 3);
     startCluster(NUM_RS);
     final AtomicLong sequenceId = new AtomicLong(100);
     final int NUM_REGIONS_TO_CREATE = 40;
@@ -1281,10 +1282,11 @@ public class TestDistributedLogSplitting {
     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
     conf.setInt("hbase.hstore.compactionThreshold", 3);
+    conf.setInt("hfile.format.version", 3);
     startCluster(NUM_RS);
     final AtomicLong sequenceId = new AtomicLong(100);
     final int NUM_REGIONS_TO_CREATE = 40;
-    final int NUM_LOG_LINES = 2000;
+    final int NUM_LOG_LINES = 1000;
     // turn off load balancing to prevent regions from moving around otherwise
     // they will consume recovered.edits
     master.balanceSwitch(false);