You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/02/26 18:03:49 UTC
[2/2] hbase git commit: HBASE-15205 Do not find the replication scope
for every WAL#append() (Ram)
HBASE-15205 Do not find the replication scope for every WAL#append() (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8f2bd060
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8f2bd060
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8f2bd060
Branch: refs/heads/master
Commit: 8f2bd06019869a1738bcfd66066737cdb7802ca8
Parents: 538815d
Author: ramkrishna <ra...@gmail.com>
Authored: Fri Feb 26 22:30:55 2016 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Fri Feb 26 22:30:55 2016 +0530
----------------------------------------------------------------------
.../hbase/protobuf/ReplicationProtbufUtil.java | 2 +-
.../hadoop/hbase/regionserver/HRegion.java | 89 +++++++++-----
.../hadoop/hbase/regionserver/HStore.java | 2 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 13 +-
.../hbase/regionserver/wal/FSWALEntry.java | 10 +-
.../hadoop/hbase/regionserver/wal/HLogKey.java | 48 ++++++--
.../regionserver/wal/WALActionsListener.java | 8 +-
.../hadoop/hbase/regionserver/wal/WALUtil.java | 57 +++++----
.../hbase/replication/ScopeWALEntryFilter.java | 2 +-
.../replication/regionserver/Replication.java | 70 +++--------
.../hadoop/hbase/wal/DisabledWALProvider.java | 4 +-
.../java/org/apache/hadoop/hbase/wal/WAL.java | 9 +-
.../org/apache/hadoop/hbase/wal/WALKey.java | 121 +++++++++++++++----
.../apache/hadoop/hbase/wal/WALSplitter.java | 2 +-
.../org/apache/hadoop/hbase/TestIOFencing.java | 3 +-
.../hbase/coprocessor/TestWALObserver.java | 48 +++++---
.../hbase/mapreduce/TestHLogRecordReader.java | 7 +-
.../hbase/mapreduce/TestImportExport.java | 16 +--
.../hbase/mapreduce/TestWALRecordReader.java | 20 +--
.../master/TestDistributedLogSplitting.java | 9 +-
.../hadoop/hbase/regionserver/TestBulkLoad.java | 17 +--
.../hadoop/hbase/regionserver/TestHRegion.java | 16 +--
.../regionserver/TestHRegionReplayEvents.java | 6 +-
.../regionserver/TestHRegionServerBulkLoad.java | 3 +-
.../hbase/regionserver/TestWALLockup.java | 10 +-
.../hbase/regionserver/wal/TestFSHLog.java | 57 ++++++---
.../regionserver/wal/TestLogRollAbort.java | 12 +-
.../wal/TestLogRollingNoCluster.java | 11 +-
.../wal/TestWALActionsListener.java | 12 +-
.../hbase/regionserver/wal/TestWALReplay.java | 47 ++++---
.../hbase/replication/TestReplicationBase.java | 9 ++
.../replication/TestReplicationSmallTests.java | 13 +-
.../TestReplicationWALEntryFilters.java | 62 +++++-----
.../TestReplicationSourceManager.java | 57 +++++----
.../TestReplicationWALReaderManager.java | 13 +-
.../apache/hadoop/hbase/wal/FaultyFSLog.java | 7 +-
.../hbase/wal/TestDefaultWALProvider.java | 64 +++++++---
.../wal/TestDefaultWALProviderWithHLogKey.java | 7 +-
.../apache/hadoop/hbase/wal/TestSecureWAL.java | 11 +-
.../apache/hadoop/hbase/wal/TestWALFactory.java | 74 ++++++++----
.../hbase/wal/TestWALReaderOnSecureWAL.java | 11 +-
.../hbase/wal/WALPerformanceEvaluation.java | 15 ++-
42 files changed, 685 insertions(+), 389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 91185af..8cb2237 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -134,7 +134,7 @@ public class ReplicationProtbufUtil {
keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
}
WALEdit edit = entry.getEdit();
- NavigableMap<byte[], Integer> scopes = key.getScopes();
+ NavigableMap<byte[], Integer> scopes = key.getReplicationScopes();
if (scopes != null && !scopes.isEmpty()) {
for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b70a4c3..406850e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -17,19 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.TextFormat;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import java.io.EOFException;
import java.io.FileNotFoundException;
@@ -195,6 +183,20 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.TextFormat;
+
@SuppressWarnings("deprecation")
@InterfaceAudience.Private
public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
@@ -583,6 +585,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final MetricsRegionWrapperImpl metricsRegionWrapper;
private final Durability durability;
private final boolean regionStatsEnabled;
+ // Stores the replication scope of the various column families of the table
+ // that has non-default scope
+ private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
/**
* HRegion constructor. This constructor should only be used for testing and
@@ -661,6 +667,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
this.htableDescriptor = htd;
+ Set<byte[]> families = this.htableDescriptor.getFamiliesKeys();
+ for (byte[] family : families) {
+ if (!replicationScope.containsKey(family)) {
+ int scope = htd.getFamily(family).getScope();
+ // Only store those families that has NON-DEFAULT scope
+ if (scope != REPLICATION_SCOPE_LOCAL) {
+ // Do a copy before storing it here.
+ replicationScope.put(Bytes.copy(family), scope);
+ }
+ }
+ }
this.rsServices = rsServices;
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
setHTableSpecificConf();
@@ -971,7 +988,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
- WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, mvcc);
+ WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc,
+ mvcc);
}
private void writeRegionCloseMarker(WAL wal) throws IOException {
@@ -979,7 +997,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
getRegionServerServices().getServerName(), storeFiles);
- WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc);
+ WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
+ mvcc);
// Store SeqId in HDFS when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
@@ -2285,7 +2304,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
// No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
+ mvcc);
}
// Prepare flush (take a snapshot)
@@ -2334,7 +2354,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false,
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
mvcc);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
@@ -2379,7 +2399,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
try {
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
+ mvcc);
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -2449,7 +2470,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
+ mvcc);
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
@@ -2462,7 +2484,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
+ WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc);
} catch (Throwable ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "failed writing ABORT_FLUSH marker to WAL", ex);
@@ -3139,13 +3161,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!replay) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
- mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+ mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc,
+ this.getReplicationScope());
}
// TODO: Use the doAppend methods below... complicated by the replay stuff above.
try {
- long txid =
- this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ long txid = this.wal.append(this.getRegionInfo(), walKey,
+ walEdit, true);
if (txid != 0) sync(txid, durability);
writeEntry = walKey.getWriteEntry();
} catch (IOException ioe) {
@@ -3271,8 +3294,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!replay) throw new IOException("Multiple nonces per batch and not in replay");
WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
- currentNonceGroup, currentNonce, mvcc);
- this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ currentNonceGroup, currentNonce, mvcc, this.getReplicationScope());
+ this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Complete the mvcc transaction started down in append else it will block others
this.mvcc.complete(walKey.getWriteEntry());
}
@@ -5389,7 +5412,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
this.getRegionInfo().getTable(),
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
- WALUtil.writeBulkLoadMarkerAndSync(this.wal, getTableDesc(), getRegionInfo(),
+ WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
if (this.rsServices != null) {
@@ -6319,6 +6342,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return r.openHRegion(reporter);
}
+ @VisibleForTesting
+ public NavigableMap<byte[], Integer> getReplicationScope() {
+ return this.replicationScope;
+ }
/**
* Useful when reopening a closed region (normally for unit tests)
@@ -7069,10 +7096,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// here instead of WALKey directly to support legacy coprocessors.
WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
- nonceGroup, nonce, mvcc);
+ nonceGroup, nonce, mvcc, this.getReplicationScope());
try {
long txid =
- this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Call sync on our edit.
if (txid != 0) sync(txid, durability);
writeEntry = walKey.getWriteEntry();
@@ -7362,7 +7389,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 46 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ 47 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
@@ -7385,7 +7412,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
MultiVersionConcurrencyControl.FIXED_SIZE // mvcc
- + ClassSize.TREEMAP // maxSeqIdInStores
+ + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
+ ClassSize.STORE_SERVICES // store services
;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 22f99e9..7c71baf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1307,7 +1307,7 @@ public class HStore implements Store {
// Fix reaching into Region to get the maxWaitForSeqId.
// Does this method belong in Region altogether given it is making so many references up there?
// Could be Region#writeCompactionMarker(compactionDescriptor);
- WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getTableDesc(),
+ WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 09da8fc..f3f869c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -1083,8 +1082,8 @@ public class FSHLog implements WAL {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
justification="Will never be null")
@Override
- public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
- final WALEdit edits, final boolean inMemstore) throws IOException {
+ public long append(final HRegionInfo hri,
+ final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException {
if (this.closed) throw new IOException("Cannot append; log is closed");
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
@@ -1100,7 +1099,7 @@ public class FSHLog implements WAL {
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
// edit with its edit/sequence id.
// TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
- entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
+ entry = new FSWALEntry(sequence, key, edits, hri, inMemstore);
truck.loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
@@ -1878,14 +1877,12 @@ public class FSHLog implements WAL {
entry.getEdit())) {
if (entry.getEdit().isReplay()) {
// Set replication scope null so that this won't be replicated
- entry.getKey().setScopes(null);
+ entry.getKey().serializeReplicationScope(false);
}
}
if (!listeners.isEmpty()) {
for (WALActionsListener i: listeners) {
- // TODO: Why does listener take a table description and CPs take a regioninfo? Fix.
- i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
- entry.getEdit());
+ i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 86a3c3d..06318f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -26,7 +26,6 @@ import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
@@ -51,15 +50,13 @@ class FSWALEntry extends Entry {
// they are only in memory and held here while passing over the ring buffer.
private final transient long sequence;
private final transient boolean inMemstore;
- private final transient HTableDescriptor htd;
private final transient HRegionInfo hri;
private final Set<byte[]> familyNames;
FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
- final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) {
+ final HRegionInfo hri, final boolean inMemstore) {
super(key, edit);
this.inMemstore = inMemstore;
- this.htd = htd;
this.hri = hri;
this.sequence = sequence;
if (inMemstore) {
@@ -71,6 +68,7 @@ class FSWALEntry extends Entry {
Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
for (Cell cell : cells) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+ // TODO: Avoid this clone?
familySet.add(CellUtil.cloneFamily(cell));
}
}
@@ -89,10 +87,6 @@ class FSWALEntry extends Entry {
return this.inMemstore;
}
- HTableDescriptor getHTableDescriptor() {
- return this.htd;
- }
-
HRegionInfo getHRegionInfo() {
return this.hri;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 7c40323..d7bf4a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -24,6 +24,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.NavigableMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
@@ -67,7 +68,7 @@ public class HLogKey extends WALKey implements Writable {
}
public HLogKey(final byte[] encodedRegionName, final TableName tablename) {
- super(encodedRegionName, tablename);
+ super(encodedRegionName, tablename, null);
}
@VisibleForTesting
@@ -75,11 +76,15 @@ public class HLogKey extends WALKey implements Writable {
super(encodedRegionName, tablename, now);
}
- public HLogKey(final byte[] encodedRegionName,
- final TableName tablename,
- final long now,
- final MultiVersionConcurrencyControl mvcc) {
- super(encodedRegionName, tablename, now, mvcc);
+ @VisibleForTesting
+ public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+ final NavigableMap<byte[], Integer> replicationScope) {
+ super(encodedRegionName, tablename, now, replicationScope);
+ }
+
+ public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+ final MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> scopes) {
+ super(encodedRegionName, tablename, now, mvcc, scopes);
}
/**
@@ -111,6 +116,35 @@ public class HLogKey extends WALKey implements Writable {
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
+ * <p>Used by log splitting and snapshots.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename - name of table
+ * @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * @param nonceGroup the noncegroup
+ * @param nonce the nonce
+ * @param replicationScope the replicationScope of the non-default column families' of the region
+ */
+ public HLogKey(
+ final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List<UUID> clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> replicationScope) {
+ super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
+ replicationScope);
+ }
+
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
@@ -192,7 +226,7 @@ public class HLogKey extends WALKey implements Writable {
// encodes the length of encodedRegionName.
// If < 0 we just read the version and the next vint is the length.
// @see Bytes#readByteArray(DataInput)
- setScopes(null); // writable HLogKey does not contain scopes
+ serializeReplicationScope(false); // writable HLogKey does not contain scopes
int len = WritableUtils.readVInt(in);
byte[] tablenameBytes = null;
if (len < 0) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index db98083..a6452e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -85,7 +84,6 @@ public interface WALActionsListener {
);
/**
- * @param htd
* @param logKey
* @param logEdit TODO: Retire this in favor of
* {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} It only exists to get
@@ -93,8 +91,7 @@ public interface WALActionsListener {
* <code>htd</code>.
* @throws IOException If failed to parse the WALEdit
*/
- void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
- throws IOException;
+ void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException;
/**
* For notification post append to the writer. Used by metrics system at least.
@@ -135,8 +132,7 @@ public interface WALActionsListener {
public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
- throws IOException {
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index f268422..197144d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -20,11 +20,11 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -58,10 +58,11 @@ public class WALUtil {
* <p>This write is for internal use only. Not for external client consumption.
* @param mvcc Used by WAL to get sequence Id for the waledit.
*/
- public static WALKey writeCompactionMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
- final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc)
+ public static WALKey writeCompactionMarker(WAL wal,
+ NavigableMap<byte[], Integer> replicationScope, HRegionInfo hri, final CompactionDescriptor c,
+ MultiVersionConcurrencyControl mvcc)
throws IOException {
- WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createCompaction(hri, c), mvcc);
+ WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
@@ -73,11 +74,11 @@ public class WALUtil {
*
* <p>This write is for internal use only. Not for external client consumption.
*/
- public static WALKey writeFlushMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
- final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
- throws IOException {
- WALKey walKey =
- doFullAppendTransaction(wal, htd, hri, WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
+ public static WALKey writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
+ HRegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
+ throws IOException {
+ WALKey walKey = doFullAppendTransaction(wal, replicationScope, hri,
+ WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
@@ -88,10 +89,12 @@ public class WALUtil {
* Write a region open marker indicating that the region is opened.
* This write is for internal use only. Not for external client consumption.
*/
- public static WALKey writeRegionEventMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
+ public static WALKey writeRegionEventMarker(WAL wal,
+ NavigableMap<byte[], Integer> replicationScope, HRegionInfo hri,
final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
throws IOException {
- WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc);
+ WALKey walKey = writeMarker(wal, replicationScope, hri,
+ WALEdit.createRegionEventWALEdit(hri, r), mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
@@ -102,28 +105,30 @@ public class WALUtil {
* Write a log marker that a bulk load has succeeded and is about to be committed.
* This write is for internal use only. Not for external client consumption.
* @param wal The log to write into.
- * @param htd A description of the table that we are bulk loading into.
+ * @param replicationScope The replication scope of the families in the HRegion
* @param hri A description of the region in the table that we are bulk loading into.
* @param desc A protocol buffers based description of the client's bulk loading request
* @return walKey with sequenceid filled out for this bulk load marker
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
- public static WALKey writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd,
- final HRegionInfo hri, final WALProtos.BulkLoadDescriptor desc,
- final MultiVersionConcurrencyControl mvcc)
- throws IOException {
- WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc);
+ public static WALKey writeBulkLoadMarkerAndSync(final WAL wal,
+ final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
+ final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
+ throws IOException {
+ WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc),
+ mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
}
return walKey;
}
- private static WALKey writeMarker(final WAL wal, final HTableDescriptor htd,
- final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
+ private static WALKey writeMarker(final WAL wal,
+ final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
+ final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
- return doFullAppendTransaction(wal, htd, hri, edit, mvcc, true);
+ return doFullAppendTransaction(wal, replicationScope, hri, edit, mvcc, true);
}
/**
@@ -134,16 +139,16 @@ public class WALUtil {
* <p>This write is for internal use only. Not for external client consumption.
* @return WALKey that was added to the WAL.
*/
- public static WALKey doFullAppendTransaction(final WAL wal, final HTableDescriptor htd,
- final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
- final boolean sync)
+ public static WALKey doFullAppendTransaction(final WAL wal,
+ final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
+ final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync)
throws IOException {
// TODO: Pass in current time to use?
- WALKey walKey =
- new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc);
+ WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(),
+ System.currentTimeMillis(), mvcc, replicationScope);
long trx = MultiVersionConcurrencyControl.NONE;
try {
- trx = wal.append(htd, hri, walKey, edit, false);
+ trx = wal.append(hri, walKey, edit, false);
if (sync) {
wal.sync(trx);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index f97ec15..28a83dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -44,7 +44,7 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
@Override
public Entry filter(Entry entry) {
- NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
+ NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
if (scopes == null || scopes.isEmpty()) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index a5d2446..bb4a5a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -20,13 +20,10 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
-import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -43,7 +40,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
@@ -61,7 +57,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.zookeeper.KeeperException;
@@ -257,72 +252,47 @@ public class Replication extends WALActionsListener.Base implements
}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
- throws IOException {
- scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
+ scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
- * @param htd Descriptor used to find the scope to use
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
* @param replicationManager Manager used to add bulk load events hfile references
* @throws IOException If failed to parse the WALEdit
*/
- public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
- Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
- NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
- byte[] family;
+ public static void scopeWALEdits(WALKey logKey,
+ WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager)
+ throws IOException {
boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
+ byte[] family;
+ boolean foundOtherEdits = false;
for (Cell cell : logEdit.getCells()) {
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
- scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
+ try {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ for (StoreDescriptor s : bld.getStoresList()) {
+ family = s.getFamilyName().toByteArray();
+ addHFileRefsToQueue(replicationManager, logKey.getTablename(), family, s);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to get bulk load events information from the wal file.", e);
+ throw e;
+ }
} else {
// Skip the flush/compaction/region events
continue;
}
} else {
- family = CellUtil.cloneFamily(cell);
- // Unexpected, has a tendency to happen in unit tests
- assert htd.getFamily(family) != null;
-
- if (!scopes.containsKey(family)) {
- int scope = htd.getFamily(family).getScope();
- if (scope != REPLICATION_SCOPE_LOCAL) {
- scopes.put(family, scope);
- }
- }
+ foundOtherEdits = true;
}
}
- if (!scopes.isEmpty() && !logEdit.isReplay()) {
- logKey.setScopes(scopes);
- }
- }
-
- private static void scopeBulkLoadEdits(HTableDescriptor htd,
- ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
- TableName tableName, Cell cell) throws IOException {
- byte[] family;
- try {
- BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
- for (StoreDescriptor s : bld.getStoresList()) {
- family = s.getFamilyName().toByteArray();
- if (!scopes.containsKey(family)) {
- int scope = htd.getFamily(family).getScope();
- if (scope != REPLICATION_SCOPE_LOCAL) {
- scopes.put(family, scope);
- addHFileRefsToQueue(replicationManager, tableName, family, s);
- }
- } else {
- addHFileRefsToQueue(replicationManager, tableName, family, s);
- }
- }
- } catch (IOException e) {
- LOG.error("Failed to get bulk load events information from the wal file.", e);
- throw e;
+ if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
+ logKey.serializeReplicationScope(false);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 0c41e77..c3d4b2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.FSUtils;
// imports for things that haven't moved from regionserver.wal yet.
@@ -154,8 +153,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- boolean inMemstore) {
+ public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) {
if (!this.listeners.isEmpty()) {
final long start = System.nanoTime();
long len = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index d2b336e..0b83528 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -25,7 +25,6 @@ import java.util.Set;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
// imports we use from yet-to-be-moved regionsever.wal
@@ -106,21 +105,17 @@ public interface WAL {
* completes BUT on return this edit must have its region edit/sequence id assigned
* else it messes up our unification of mvcc and sequenceid. On return <code>key</code> will
* have the region edit/sequence id filled in.
- * @param info
+ * @param info the regioninfo associated with append
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
- * @param htd used to give scope for replication TODO refactor out in favor of table name and
- * info
* @param inMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore.
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/
- long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- boolean inMemstore)
- throws IOException;
+ long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException;
/**
* Sync what we have in the WAL.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 09096fe..86fdfbd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -193,7 +193,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
@InterfaceAudience.Private
protected List<UUID> clusterIds;
- private NavigableMap<byte[], Integer> scopes;
+ private NavigableMap<byte[], Integer> replicationScope;
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
@@ -210,7 +210,12 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
public WALKey() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
- new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null);
+ new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
+ }
+
+ public WALKey(final NavigableMap<byte[], Integer> replicationScope) {
+ init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
+ new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope);
}
@VisibleForTesting
@@ -220,15 +225,16 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
List<UUID> clusterIds = new ArrayList<UUID>();
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
- HConstants.NO_NONCE, HConstants.NO_NONCE, null);
+ HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
}
/**
* @deprecated Remove. Useless.
*/
@Deprecated // REMOVE
- public WALKey(final byte[] encodedRegionName, final TableName tablename) {
- this(encodedRegionName, tablename, System.currentTimeMillis());
+ public WALKey(final byte[] encodedRegionName, final TableName tablename,
+ final NavigableMap<byte[], Integer> replicationScope) {
+ this(encodedRegionName, tablename, System.currentTimeMillis(), replicationScope);
}
// TODO: Fix being able to pass in sequenceid.
@@ -240,7 +246,20 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
- null);
+ null, null);
+ }
+
+ // TODO: Fix being able to pass in sequenceid.
+ public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+ final NavigableMap<byte[], Integer> replicationScope) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
+ HConstants.NO_NONCE, null, replicationScope);
+ }
+
+ public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+ MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
+ HConstants.NO_NONCE, mvcc, replicationScope);
}
public WALKey(final byte[] encodedRegionName,
@@ -254,7 +273,33 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
- mvcc);
+ mvcc, null);
+ }
+
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
+ * <p>Used by log splitting and snapshots.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename - name of table
+ * @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * @param nonceGroup the nonceGroup
+ * @param nonce the nonce
+ * @param mvcc the mvcc associate the WALKey
+ * @param replicationScope the non-default replication scope
+ * associated with the region's column families
+ */
+ // TODO: Fix being able to pass in sequenceid.
+ public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+ final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+ MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
+ replicationScope);
}
/**
@@ -279,7 +324,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc) {
- init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null);
}
/**
@@ -289,7 +334,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
- * @param tablename
+ * @param tablename the tablename
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup
@@ -299,7 +344,31 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
public WALKey(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc) {
- init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc);
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
+ null);
+ }
+
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * @param nonceGroup the nonceGroup
+ * @param nonce the nonce
+ * @param mvcc mvcc control used to generate sequence numbers and control read/write points
+ * @param replicationScope the non-default replication scope of the column families
+ */
+ public WALKey(final byte[] encodedRegionName, final TableName tablename,
+ final long now, List<UUID> clusterIds, long nonceGroup,
+ final long nonce, final MultiVersionConcurrencyControl mvcc,
+ NavigableMap<byte[], Integer> replicationScope) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
+ replicationScope);
}
/**
@@ -328,7 +397,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
EMPTY_UUIDS,
nonceGroup,
nonce,
- mvcc);
+ mvcc, null);
}
@InterfaceAudience.Private
@@ -339,7 +408,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
List<UUID> clusterIds,
long nonceGroup,
long nonce,
- MultiVersionConcurrencyControl mvcc) {
+ MultiVersionConcurrencyControl mvcc,
+ NavigableMap<byte[], Integer> replicationScope) {
this.sequenceId = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
@@ -351,6 +421,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
if (logSeqNum != NO_SEQUENCE_ID) {
setSequenceId(logSeqNum);
}
+ this.replicationScope = replicationScope;
}
// For HLogKey and deserialization. DO NOT USE. See setWriteEntry below.
@@ -418,8 +489,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
return this.writeTime;
}
- public NavigableMap<byte[], Integer> getScopes() {
- return scopes;
+ public NavigableMap<byte[], Integer> getReplicationScopes() {
+ return replicationScope;
}
/** @return The nonce group */
@@ -432,8 +503,14 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
return nonce;
}
- public void setScopes(NavigableMap<byte[], Integer> scopes) {
- this.scopes = scopes;
+ private void setReplicationScope(NavigableMap<byte[], Integer> replicationScope) {
+ this.replicationScope = replicationScope;
+ }
+
+ public void serializeReplicationScope(boolean serialize) {
+ if (!serialize) {
+ setReplicationScope(null);
+ }
}
public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
@@ -450,7 +527,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
}
if (scopes.size() > 0) {
- this.scopes = scopes;
+ this.replicationScope = scopes;
}
}
}
@@ -598,8 +675,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
builder.addClusterIds(uuidBuilder.build());
}
- if (scopes != null) {
- for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
+ if (replicationScope != null) {
+ for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) {
ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
: compressor.compress(e.getKey(), compressionContext.familyDict);
builder.addScopes(FamilyScope.newBuilder()
@@ -638,13 +715,13 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
if (walKey.hasNonce()) {
this.nonce = walKey.getNonce();
}
- this.scopes = null;
+ this.replicationScope = null;
if (walKey.getScopesCount() > 0) {
- this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ this.replicationScope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (FamilyScope scope : walKey.getScopesList()) {
byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
- this.scopes.put(family, scope.getScopeType().getNumber());
+ this.replicationScope.put(family, scope.getScopeType().getNumber());
}
}
setSequenceId(walKey.getLogSequenceNumber());
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 010fd37..ad5774f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -1778,7 +1778,7 @@ public class WALSplitter {
WALEdit edit = entry.getEdit();
TableName table = entry.getKey().getTablename();
// clear scopes which isn't needed for recovery
- entry.getKey().setScopes(null);
+ entry.getKey().serializeReplicationScope(false);
String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
// skip edits of non-existent tables
if (nonExistentTables != null && nonExistentTables.contains(table)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 3aae5d5..d8363d4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -281,7 +281,8 @@ public class TestIOFencing {
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
- WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
+ WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
+ ((HRegion)compactingRegion).getReplicationScope(),
oldHri, compactionDescriptor, compactingRegion.getMVCC());
// Wait till flush has happened, otherwise there won't be multiple store files
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index 75fe7a2..03760d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -29,6 +29,8 @@ import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -184,7 +186,11 @@ public class TestWALObserver {
HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
.toString(TEST_TABLE));
-
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
deleteDir(basedir);
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
@@ -235,8 +241,8 @@ public class TestWALObserver {
// it's where WAL write cp should occur.
long now = EnvironmentEdgeManager.currentTime();
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
- edit, true);
+ long txid = log.append(hri,
+ new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, scopes), edit, true);
log.sync(txid);
// the edit shall have been change now by the coprocessor.
@@ -296,10 +302,15 @@ public class TestWALObserver {
assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
LOG.debug("writing to WAL with non-legacy keys.");
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for (HColumnDescriptor hcd : htd.getFamilies()) {
+ scopes.put(hcd.getName(), 0);
+ }
final int countPerFamily = 5;
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
+ EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
}
LOG.debug("Verify that only the non-legacy CP saw edits.");
@@ -323,7 +334,7 @@ public class TestWALObserver {
final WALEdit edit = new WALEdit();
final byte[] nonce = Bytes.toBytes("1772");
edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
- final long txid = wal.append(htd, hri, legacyKey, edit, true);
+ final long txid = wal.append(hri, legacyKey, edit, true);
wal.sync(txid);
LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
@@ -349,7 +360,11 @@ public class TestWALObserver {
final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
WAL log = wals.getWAL(UNSPECIFIED_REGION, null);
try {
SampleRegionWALObserver cp = getCoprocessor(log, SampleRegionWALObserver.class);
@@ -360,8 +375,8 @@ public class TestWALObserver {
assertFalse(cp.isPostWALWriteCalled());
final long now = EnvironmentEdgeManager.currentTime();
- long txid = log.append(htd, hri,
- new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc),
+ long txid = log.append(hri,
+ new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
new WALEdit(), true);
log.sync(txid);
@@ -400,14 +415,18 @@ public class TestWALObserver {
// Put p = creatPutWith2Families(TEST_ROW);
WALEdit edit = new WALEdit();
long now = EnvironmentEdgeManager.currentTime();
- // addFamilyMapToWALEdit(p.getFamilyMap(), edit);
final int countPerFamily = 1000;
- // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for (HColumnDescriptor hcd : htd.getFamilies()) {
+ scopes.put(hcd.getName(), 0);
+ }
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
+ EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
}
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
+ wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
+ true);
// sync to fs.
wal.sync();
@@ -527,7 +546,8 @@ public class TestWALObserver {
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
- final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
+ final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc)
+ throws IOException {
String familyStr = Bytes.toString(family);
long txid = -1;
for (int j = 0; j < count; j++) {
@@ -537,7 +557,7 @@ public class TestWALObserver {
edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
// uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
// about legacy coprocessors
- txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
+ txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
ee.currentTime(), mvcc), edit, true);
}
if (-1 != txid) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
index 5fa588b..752faa6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+
+import java.util.NavigableMap;
+
import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogKeyRecordReader;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -32,8 +35,8 @@ import org.junit.experimental.categories.Category;
public class TestHLogRecordReader extends TestWALRecordReader {
@Override
- protected WALKey getWalKey(final long time) {
- return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
+ protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
+ return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
index 05f9f36..094fe1c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
@@ -34,6 +34,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -656,9 +657,9 @@ public class TestImportExport {
Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
// Register the wal listener for the import table
- TableWALActionListener walListener = new TableWALActionListener(importTableName);
HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
+ TableWALActionListener walListener = new TableWALActionListener(region);
WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
wal.registerWALActionsListener(walListener);
@@ -678,7 +679,7 @@ public class TestImportExport {
region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
- walListener = new TableWALActionListener(importTableName);
+ walListener = new TableWALActionListener(region);
wal.registerWALActionsListener(walListener);
args = new String[] { importTableName, FQ_OUTPUT_DIR };
assertTrue(runImport(args));
@@ -695,16 +696,17 @@ public class TestImportExport {
*/
private static class TableWALActionListener extends WALActionsListener.Base {
- private String tableName;
+ private HRegionInfo regionInfo;
private boolean isVisited = false;
- public TableWALActionListener(String tableName) {
- this.tableName = tableName;
+ public TableWALActionListener(HRegionInfo region) {
+ this.regionInfo = region;
}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
- if (tableName.equalsIgnoreCase(htd.getNameAsString()) && (!logEdit.isMetaEdit())) {
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
+ if (logKey.getTablename().getNameAsString().equalsIgnoreCase(
+ this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) {
isVisited = true;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index a4381c8..aee2a06 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -77,6 +79,8 @@ public class TestWALRecordReader {
private static HTableDescriptor htd;
private static Path logDir;
protected MultiVersionConcurrencyControl mvcc;
+ protected static NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
private static String getName() {
return "TestWALRecordReader";
@@ -128,10 +132,10 @@ public class TestWALRecordReader {
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
- log.append(htd, info, getWalKey(ts), edit, true);
+ log.append(info, getWalKey(ts, scopes), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
- log.append(htd, info, getWalKey(ts+1), edit, true);
+ log.append(info, getWalKey(ts+1, scopes), edit, true);
log.sync();
LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter();
@@ -142,10 +146,10 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
- log.append(htd, info, getWalKey(ts1+1), edit, true);
+ log.append(info, getWalKey(ts1+1, scopes), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
- log.append(htd, info, getWalKey(ts1+2), edit, true);
+ log.append(info, getWalKey(ts1+2, scopes), edit, true);
log.sync();
log.shutdown();
walfactory.shutdown();
@@ -187,7 +191,7 @@ public class TestWALRecordReader {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
- long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
+ long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid);
Thread.sleep(1); // make sure 2nd log gets a later timestamp
@@ -197,7 +201,7 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
- txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
+ txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid);
log.shutdown();
walfactory.shutdown();
@@ -236,8 +240,8 @@ public class TestWALRecordReader {
testSplit(splits.get(1));
}
- protected WALKey getWalKey(final long time) {
- return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
+ protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
+ return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
}
protected WALRecordReader getReader() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index c5728cf..cff8db0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -1301,9 +1301,8 @@ public class TestDistributedLogSplitting {
WALEdit e = new WALEdit();
value++;
e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
- wal.append(htd, curRegionInfo,
- new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()),
- e, true);
+ wal.append(curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), null), e, true);
}
wal.sync();
wal.shutdown();
@@ -1397,7 +1396,7 @@ public class TestDistributedLogSplitting {
WALEdit e = new WALEdit();
value++;
e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
- wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
+ wal.append(curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
tableName, System.currentTimeMillis()), e, true);
}
wal.sync();
@@ -1609,7 +1608,7 @@ public class TestDistributedLogSplitting {
// key
byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
- log.append(htd, curRegionInfo,
+ log.append(curRegionInfo,
new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName,
System.currentTimeMillis()), e, true);
if (0 == i % syncEvery) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
index 3a7aff0..d0633a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
@@ -87,6 +87,7 @@ public class TestBulkLoad {
private final byte[] randomBytes = new byte[100];
private final byte[] family1 = Bytes.toBytes("family1");
private final byte[] family2 = Bytes.toBytes("family2");
+
@Rule
public TestName name = new TestName();
@@ -105,12 +106,12 @@ public class TestBulkLoad {
storeFileName = (new Path(storeFileName)).getName();
List<String> storeFileNames = new ArrayList<String>();
storeFileNames.add(storeFileName);
- when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), any(WALKey.class),
+ when(log.append(any(HRegionInfo.class), any(WALKey.class),
argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(),
familyName, storeFileNames)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
- WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+ WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@@ -132,11 +133,11 @@ public class TestBulkLoad {
@Test
public void shouldBulkLoadSingleFamilyHLog() throws IOException {
- when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
+ when(log.append(any(HRegionInfo.class),
any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
- WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+ WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@@ -151,11 +152,11 @@ public class TestBulkLoad {
@Test
public void shouldBulkLoadManyFamilyHLog() throws IOException {
- when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
+ when(log.append(any(HRegionInfo.class),
any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
- WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+ WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@@ -171,11 +172,11 @@ public class TestBulkLoad {
@Test
public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
- when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
+ when(log.append(any(HRegionInfo.class),
any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
- WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+ WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a5574d3..ed7623c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -896,7 +896,7 @@ public class TestHRegion {
storeFiles, Lists.newArrayList(newFile),
region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
- WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
+ WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
@@ -4796,7 +4796,7 @@ public class TestHRegion {
//verify append called or not
verify(wal, expectAppend ? times(1) : never())
- .append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
+ .append((HRegionInfo)any(), (WALKey)any(),
(WALEdit)any(), Mockito.anyBoolean());
// verify sync called or not
@@ -5998,7 +5998,7 @@ public class TestHRegion {
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
TEST_UTIL.getConfiguration(), rss, null);
- verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
+ verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getValue();
@@ -6111,7 +6111,7 @@ public class TestHRegion {
// verify that we have not appended region open event to WAL because this region is still
// recovering
- verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
+ verify(wal, times(0)).append((HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), anyBoolean());
// not put the region out of recovering state
@@ -6119,7 +6119,7 @@ public class TestHRegion {
.prepare().process();
// now we should have put the entry
- verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
+ verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getValue();
@@ -6163,12 +6163,12 @@ public class TestHRegion {
*/
private WAL mockWAL() throws IOException {
WAL wal = mock(WAL.class);
- Mockito.when(wal.append((HTableDescriptor)Mockito.any(), (HRegionInfo)Mockito.any(),
+ Mockito.when(wal.append((HRegionInfo)Mockito.any(),
(WALKey)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
thenAnswer(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
- WALKey key = invocation.getArgumentAt(2, WALKey.class);
+ WALKey key = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
key.setWriteEntry(we);
return 1L;
@@ -6206,7 +6206,7 @@ public class TestHRegion {
region.close(false);
// 2 times, one for region open, the other close region
- verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
+ verify(wal, times(2)).append((HRegionInfo)any(), (WALKey)any(),
editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getAllValues().get(1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 4d5d7d8..9183e18 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -1126,7 +1126,7 @@ public class TestHRegionReplayEvents {
// test for region open and close
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
- verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+ verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
// test for replay prepare flush
@@ -1140,11 +1140,11 @@ public class TestHRegionReplayEvents {
.setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
.build());
- verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+ verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
secondaryRegion.close();
- verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+ verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 87cbab7..76b4134 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -411,7 +412,7 @@ public class TestHRegionServerBulkLoad {
private boolean found = false;
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
for (Cell cell : logEdit.getCells()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
for (Map.Entry entry : kv.toStringMap().entrySet()) {