You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2013/04/27 00:12:50 UTC
svn commit: r1476419 [1/2] - in /hbase/branches/0.95:
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/
hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org...
Author: enis
Date: Fri Apr 26 22:12:49 2013
New Revision: 1476419
URL: http://svn.apache.org/r1476419
Log:
HBASE-2231 Compaction events should be written to HLog (Stack & Enis)
Added:
hbase/branches/0.95/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java
hbase/branches/0.95/hbase-protocol/src/main/protobuf/WAL.proto
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
Modified:
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Fri Apr 26 22:12:49 2013
@@ -35,19 +35,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
-
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
@@ -63,13 +52,13 @@ import org.apache.hadoop.hbase.client.Ad
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
@@ -117,6 +106,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.access.TablePermission;
import org.apache.hadoop.hbase.security.access.UserPermission;
@@ -128,6 +118,17 @@ import org.apache.hadoop.hbase.util.Pair
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
+
/**
* Protobufs utility.
*/
@@ -1997,4 +1998,23 @@ public final class ProtobufUtil {
throw new IOException(e);
}
}
+
+ public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
+ List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
+ // compaction descriptor contains relative paths.
+ // input / output paths are relative to the store dir
+ // store dir is relative to region dir
+ CompactionDescriptor.Builder builder = CompactionDescriptor.newBuilder()
+ .setTableName(ByteString.copyFrom(info.getTableName()))
+ .setEncodedRegionName(ByteString.copyFrom(info.getEncodedNameAsBytes()))
+ .setFamilyName(ByteString.copyFrom(family))
+ .setStoreHomeDir(storeDir.getName()); //make relative
+ for (Path inputPath : inputPaths) {
+ builder.addCompactionInput(inputPath.getName()); //relative path
+ }
+ for (Path outputPath : outputPaths) {
+ builder.addCompactionOutput(outputPath.getName());
+ }
+ return builder.build();
+ }
}
Added: hbase/branches/0.95/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java?rev=1476419&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java (added)
+++ hbase/branches/0.95/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java Fri Apr 26 22:12:49 2013
@@ -0,0 +1,938 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: WAL.proto
+
+package org.apache.hadoop.hbase.protobuf.generated;
+
+public final class WAL {
+ private WAL() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ }
+ public interface CompactionDescriptorOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required bytes tableName = 1;
+ boolean hasTableName();
+ com.google.protobuf.ByteString getTableName();
+
+ // required bytes encodedRegionName = 2;
+ boolean hasEncodedRegionName();
+ com.google.protobuf.ByteString getEncodedRegionName();
+
+ // required bytes familyName = 3;
+ boolean hasFamilyName();
+ com.google.protobuf.ByteString getFamilyName();
+
+ // repeated string compactionInput = 4;
+ java.util.List<String> getCompactionInputList();
+ int getCompactionInputCount();
+ String getCompactionInput(int index);
+
+ // repeated string compactionOutput = 5;
+ java.util.List<String> getCompactionOutputList();
+ int getCompactionOutputCount();
+ String getCompactionOutput(int index);
+
+ // required string storeHomeDir = 6;
+ boolean hasStoreHomeDir();
+ String getStoreHomeDir();
+ }
+ public static final class CompactionDescriptor extends
+ com.google.protobuf.GeneratedMessage
+ implements CompactionDescriptorOrBuilder {
+ // Use CompactionDescriptor.newBuilder() to construct.
+ private CompactionDescriptor(Builder builder) {
+ super(builder);
+ }
+ private CompactionDescriptor(boolean noInit) {}
+
+ private static final CompactionDescriptor defaultInstance;
+ public static CompactionDescriptor getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public CompactionDescriptor getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_fieldAccessorTable;
+ }
+
+ private int bitField0_;
+ // required bytes tableName = 1;
+ public static final int TABLENAME_FIELD_NUMBER = 1;
+ private com.google.protobuf.ByteString tableName_;
+ public boolean hasTableName() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public com.google.protobuf.ByteString getTableName() {
+ return tableName_;
+ }
+
+ // required bytes encodedRegionName = 2;
+ public static final int ENCODEDREGIONNAME_FIELD_NUMBER = 2;
+ private com.google.protobuf.ByteString encodedRegionName_;
+ public boolean hasEncodedRegionName() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public com.google.protobuf.ByteString getEncodedRegionName() {
+ return encodedRegionName_;
+ }
+
+ // required bytes familyName = 3;
+ public static final int FAMILYNAME_FIELD_NUMBER = 3;
+ private com.google.protobuf.ByteString familyName_;
+ public boolean hasFamilyName() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public com.google.protobuf.ByteString getFamilyName() {
+ return familyName_;
+ }
+
+ // repeated string compactionInput = 4;
+ public static final int COMPACTIONINPUT_FIELD_NUMBER = 4;
+ private com.google.protobuf.LazyStringList compactionInput_;
+ public java.util.List<String>
+ getCompactionInputList() {
+ return compactionInput_;
+ }
+ public int getCompactionInputCount() {
+ return compactionInput_.size();
+ }
+ public String getCompactionInput(int index) {
+ return compactionInput_.get(index);
+ }
+
+ // repeated string compactionOutput = 5;
+ public static final int COMPACTIONOUTPUT_FIELD_NUMBER = 5;
+ private com.google.protobuf.LazyStringList compactionOutput_;
+ public java.util.List<String>
+ getCompactionOutputList() {
+ return compactionOutput_;
+ }
+ public int getCompactionOutputCount() {
+ return compactionOutput_.size();
+ }
+ public String getCompactionOutput(int index) {
+ return compactionOutput_.get(index);
+ }
+
+ // required string storeHomeDir = 6;
+ public static final int STOREHOMEDIR_FIELD_NUMBER = 6;
+ private java.lang.Object storeHomeDir_;
+ public boolean hasStoreHomeDir() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public String getStoreHomeDir() {
+ java.lang.Object ref = storeHomeDir_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ storeHomeDir_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getStoreHomeDirBytes() {
+ java.lang.Object ref = storeHomeDir_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ storeHomeDir_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ private void initFields() {
+ tableName_ = com.google.protobuf.ByteString.EMPTY;
+ encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
+ familyName_ = com.google.protobuf.ByteString.EMPTY;
+ compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ storeHomeDir_ = "";
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasTableName()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasEncodedRegionName()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasFamilyName()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasStoreHomeDir()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, tableName_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, encodedRegionName_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(3, familyName_);
+ }
+ for (int i = 0; i < compactionInput_.size(); i++) {
+ output.writeBytes(4, compactionInput_.getByteString(i));
+ }
+ for (int i = 0; i < compactionOutput_.size(); i++) {
+ output.writeBytes(5, compactionOutput_.getByteString(i));
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeBytes(6, getStoreHomeDirBytes());
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, tableName_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, encodedRegionName_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(3, familyName_);
+ }
+ {
+ int dataSize = 0;
+ for (int i = 0; i < compactionInput_.size(); i++) {
+ dataSize += com.google.protobuf.CodedOutputStream
+ .computeBytesSizeNoTag(compactionInput_.getByteString(i));
+ }
+ size += dataSize;
+ size += 1 * getCompactionInputList().size();
+ }
+ {
+ int dataSize = 0;
+ for (int i = 0; i < compactionOutput_.size(); i++) {
+ dataSize += com.google.protobuf.CodedOutputStream
+ .computeBytesSizeNoTag(compactionOutput_.getByteString(i));
+ }
+ size += dataSize;
+ size += 1 * getCompactionOutputList().size();
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(6, getStoreHomeDirBytes());
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor other = (org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor) obj;
+
+ boolean result = true;
+ result = result && (hasTableName() == other.hasTableName());
+ if (hasTableName()) {
+ result = result && getTableName()
+ .equals(other.getTableName());
+ }
+ result = result && (hasEncodedRegionName() == other.hasEncodedRegionName());
+ if (hasEncodedRegionName()) {
+ result = result && getEncodedRegionName()
+ .equals(other.getEncodedRegionName());
+ }
+ result = result && (hasFamilyName() == other.hasFamilyName());
+ if (hasFamilyName()) {
+ result = result && getFamilyName()
+ .equals(other.getFamilyName());
+ }
+ result = result && getCompactionInputList()
+ .equals(other.getCompactionInputList());
+ result = result && getCompactionOutputList()
+ .equals(other.getCompactionOutputList());
+ result = result && (hasStoreHomeDir() == other.hasStoreHomeDir());
+ if (hasStoreHomeDir()) {
+ result = result && getStoreHomeDir()
+ .equals(other.getStoreHomeDir());
+ }
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasTableName()) {
+ hash = (37 * hash) + TABLENAME_FIELD_NUMBER;
+ hash = (53 * hash) + getTableName().hashCode();
+ }
+ if (hasEncodedRegionName()) {
+ hash = (37 * hash) + ENCODEDREGIONNAME_FIELD_NUMBER;
+ hash = (53 * hash) + getEncodedRegionName().hashCode();
+ }
+ if (hasFamilyName()) {
+ hash = (37 * hash) + FAMILYNAME_FIELD_NUMBER;
+ hash = (53 * hash) + getFamilyName().hashCode();
+ }
+ if (getCompactionInputCount() > 0) {
+ hash = (37 * hash) + COMPACTIONINPUT_FIELD_NUMBER;
+ hash = (53 * hash) + getCompactionInputList().hashCode();
+ }
+ if (getCompactionOutputCount() > 0) {
+ hash = (37 * hash) + COMPACTIONOUTPUT_FIELD_NUMBER;
+ hash = (53 * hash) + getCompactionOutputList().hashCode();
+ }
+ if (hasStoreHomeDir()) {
+ hash = (37 * hash) + STOREHOMEDIR_FIELD_NUMBER;
+ hash = (53 * hash) + getStoreHomeDir().hashCode();
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ return hash;
+ }
+
+ public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptorOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ tableName_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ familyName_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ storeHomeDir_ = "";
+ bitField0_ = (bitField0_ & ~0x00000020);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.getDescriptor();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor getDefaultInstanceForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor build() {
+ org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor buildPartial() {
+ org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor result = new org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.tableName_ = tableName_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.encodedRegionName_ = encodedRegionName_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.familyName_ = familyName_;
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ compactionInput_ = new com.google.protobuf.UnmodifiableLazyStringList(
+ compactionInput_);
+ bitField0_ = (bitField0_ & ~0x00000008);
+ }
+ result.compactionInput_ = compactionInput_;
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ compactionOutput_ = new com.google.protobuf.UnmodifiableLazyStringList(
+ compactionOutput_);
+ bitField0_ = (bitField0_ & ~0x00000010);
+ }
+ result.compactionOutput_ = compactionOutput_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.storeHomeDir_ = storeHomeDir_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor) {
+ return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor other) {
+ if (other == org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.getDefaultInstance()) return this;
+ if (other.hasTableName()) {
+ setTableName(other.getTableName());
+ }
+ if (other.hasEncodedRegionName()) {
+ setEncodedRegionName(other.getEncodedRegionName());
+ }
+ if (other.hasFamilyName()) {
+ setFamilyName(other.getFamilyName());
+ }
+ if (!other.compactionInput_.isEmpty()) {
+ if (compactionInput_.isEmpty()) {
+ compactionInput_ = other.compactionInput_;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ } else {
+ ensureCompactionInputIsMutable();
+ compactionInput_.addAll(other.compactionInput_);
+ }
+ onChanged();
+ }
+ if (!other.compactionOutput_.isEmpty()) {
+ if (compactionOutput_.isEmpty()) {
+ compactionOutput_ = other.compactionOutput_;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ } else {
+ ensureCompactionOutputIsMutable();
+ compactionOutput_.addAll(other.compactionOutput_);
+ }
+ onChanged();
+ }
+ if (other.hasStoreHomeDir()) {
+ setStoreHomeDir(other.getStoreHomeDir());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasTableName()) {
+
+ return false;
+ }
+ if (!hasEncodedRegionName()) {
+
+ return false;
+ }
+ if (!hasFamilyName()) {
+
+ return false;
+ }
+ if (!hasStoreHomeDir()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ tableName_ = input.readBytes();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ encodedRegionName_ = input.readBytes();
+ break;
+ }
+ case 26: {
+ bitField0_ |= 0x00000004;
+ familyName_ = input.readBytes();
+ break;
+ }
+ case 34: {
+ ensureCompactionInputIsMutable();
+ compactionInput_.add(input.readBytes());
+ break;
+ }
+ case 42: {
+ ensureCompactionOutputIsMutable();
+ compactionOutput_.add(input.readBytes());
+ break;
+ }
+ case 50: {
+ bitField0_ |= 0x00000020;
+ storeHomeDir_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required bytes tableName = 1;
+ private com.google.protobuf.ByteString tableName_ = com.google.protobuf.ByteString.EMPTY;
+ public boolean hasTableName() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public com.google.protobuf.ByteString getTableName() {
+ return tableName_;
+ }
+ public Builder setTableName(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ tableName_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearTableName() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ tableName_ = getDefaultInstance().getTableName();
+ onChanged();
+ return this;
+ }
+
+ // required bytes encodedRegionName = 2;
+ private com.google.protobuf.ByteString encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
+ public boolean hasEncodedRegionName() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public com.google.protobuf.ByteString getEncodedRegionName() {
+ return encodedRegionName_;
+ }
+ public Builder setEncodedRegionName(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ encodedRegionName_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearEncodedRegionName() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ encodedRegionName_ = getDefaultInstance().getEncodedRegionName();
+ onChanged();
+ return this;
+ }
+
+ // required bytes familyName = 3;
+ private com.google.protobuf.ByteString familyName_ = com.google.protobuf.ByteString.EMPTY;
+ public boolean hasFamilyName() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public com.google.protobuf.ByteString getFamilyName() {
+ return familyName_;
+ }
+ public Builder setFamilyName(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ familyName_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearFamilyName() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ familyName_ = getDefaultInstance().getFamilyName();
+ onChanged();
+ return this;
+ }
+
+ // repeated string compactionInput = 4;
+ private com.google.protobuf.LazyStringList compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ private void ensureCompactionInputIsMutable() {
+ if (!((bitField0_ & 0x00000008) == 0x00000008)) {
+ compactionInput_ = new com.google.protobuf.LazyStringArrayList(compactionInput_);
+ bitField0_ |= 0x00000008;
+ }
+ }
+ public java.util.List<String>
+ getCompactionInputList() {
+ return java.util.Collections.unmodifiableList(compactionInput_);
+ }
+ public int getCompactionInputCount() {
+ return compactionInput_.size();
+ }
+ public String getCompactionInput(int index) {
+ return compactionInput_.get(index);
+ }
+ public Builder setCompactionInput(
+ int index, String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureCompactionInputIsMutable();
+ compactionInput_.set(index, value);
+ onChanged();
+ return this;
+ }
+ public Builder addCompactionInput(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureCompactionInputIsMutable();
+ compactionInput_.add(value);
+ onChanged();
+ return this;
+ }
+ public Builder addAllCompactionInput(
+ java.lang.Iterable<String> values) {
+ ensureCompactionInputIsMutable();
+ super.addAll(values, compactionInput_);
+ onChanged();
+ return this;
+ }
+ public Builder clearCompactionInput() {
+ compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ onChanged();
+ return this;
+ }
+ void addCompactionInput(com.google.protobuf.ByteString value) {
+ ensureCompactionInputIsMutable();
+ compactionInput_.add(value);
+ onChanged();
+ }
+
+ // repeated string compactionOutput = 5;
+ private com.google.protobuf.LazyStringList compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ private void ensureCompactionOutputIsMutable() {
+ if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+ compactionOutput_ = new com.google.protobuf.LazyStringArrayList(compactionOutput_);
+ bitField0_ |= 0x00000010;
+ }
+ }
+ public java.util.List<String>
+ getCompactionOutputList() {
+ return java.util.Collections.unmodifiableList(compactionOutput_);
+ }
+ public int getCompactionOutputCount() {
+ return compactionOutput_.size();
+ }
+ public String getCompactionOutput(int index) {
+ return compactionOutput_.get(index);
+ }
+ public Builder setCompactionOutput(
+ int index, String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureCompactionOutputIsMutable();
+ compactionOutput_.set(index, value);
+ onChanged();
+ return this;
+ }
+ public Builder addCompactionOutput(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureCompactionOutputIsMutable();
+ compactionOutput_.add(value);
+ onChanged();
+ return this;
+ }
+ public Builder addAllCompactionOutput(
+ java.lang.Iterable<String> values) {
+ ensureCompactionOutputIsMutable();
+ super.addAll(values, compactionOutput_);
+ onChanged();
+ return this;
+ }
+ public Builder clearCompactionOutput() {
+ compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ onChanged();
+ return this;
+ }
+ void addCompactionOutput(com.google.protobuf.ByteString value) {
+ ensureCompactionOutputIsMutable();
+ compactionOutput_.add(value);
+ onChanged();
+ }
+
+ // required string storeHomeDir = 6;
+ private java.lang.Object storeHomeDir_ = "";
+ public boolean hasStoreHomeDir() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public String getStoreHomeDir() {
+ java.lang.Object ref = storeHomeDir_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ storeHomeDir_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setStoreHomeDir(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000020;
+ storeHomeDir_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearStoreHomeDir() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ storeHomeDir_ = getDefaultInstance().getStoreHomeDir();
+ onChanged();
+ return this;
+ }
+ void setStoreHomeDir(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000020;
+ storeHomeDir_ = value;
+ onChanged();
+ }
+
+ // @@protoc_insertion_point(builder_scope:CompactionDescriptor)
+ }
+
+ static {
+ defaultInstance = new CompactionDescriptor(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:CompactionDescriptor)
+ }
+
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_CompactionDescriptor_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_CompactionDescriptor_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n\tWAL.proto\032\013hbase.proto\"\241\001\n\024CompactionD" +
+ "escriptor\022\021\n\ttableName\030\001 \002(\014\022\031\n\021encodedR" +
+ "egionName\030\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022\027\n\017c" +
+ "ompactionInput\030\004 \003(\t\022\030\n\020compactionOutput" +
+ "\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\tB6\n*org.apac" +
+ "he.hadoop.hbase.protobuf.generatedB\003WALH" +
+ "\001\240\001\001"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ internal_static_CompactionDescriptor_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_CompactionDescriptor_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_CompactionDescriptor_descriptor,
+ new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", },
+ org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.class,
+ org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.Builder.class);
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.getDescriptor(),
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
Added: hbase/branches/0.95/hbase-protocol/src/main/protobuf/WAL.proto
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-protocol/src/main/protobuf/WAL.proto?rev=1476419&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-protocol/src/main/protobuf/WAL.proto (added)
+++ hbase/branches/0.95/hbase-protocol/src/main/protobuf/WAL.proto Fri Apr 26 22:12:49 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "WAL";
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "hbase.proto";
+
+/**
+ * WAL entries
+ */
+
+/**
+ * Special WAL entry to hold all related to a compaction.
+ * Written to WAL before completing compaction. There is
+ * sufficient info in the below message to complete later
+ * the * compaction should we fail the WAL write.
+ */
+message CompactionDescriptor {
+ required bytes tableName = 1;
+ required bytes encodedRegionName = 2;
+ required bytes familyName = 3;
+ repeated string compactionInput = 4;
+ repeated string compactionOutput = 5;
+ required string storeHomeDir = 6;
+}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java Fri Apr 26 22:12:49 2013
@@ -35,9 +35,7 @@ import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
@@ -83,7 +81,7 @@ public class WALPlayer extends Configure
// skip all other tables
if (Bytes.equals(table, key.getTablename())) {
for (KeyValue kv : value.getKeyValues()) {
- if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
+ if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
context.write(new ImmutableBytesWritable(kv.getRow()), kv);
}
}
@@ -127,7 +125,7 @@ public class WALPlayer extends Configure
KeyValue lastKV = null;
for (KeyValue kv : value.getKeyValues()) {
// filtering HLog meta entries
- if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
+ if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
// A WALEdit may contain multiple operations (HBASE-3584) and/or
// multiple rows (HBASE-5229).
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Apr 26 22:12:49 2013
@@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.KeyValueU
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.IsolationLevel;
@@ -89,7 +90,6 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.DroppedSnapshotException;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
@@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.monitorin
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -235,7 +236,7 @@ public class HRegion implements HeapSize
private final HLog log;
private final HRegionFileSystem fs;
- private final Configuration conf;
+ protected final Configuration conf;
private final Configuration baseConf;
private final KeyValue.KVComparator comparator;
private final int rowLockWaitDuration;
@@ -1174,7 +1175,7 @@ public class HRegion implements HeapSize
* Do preparation for pending compaction.
* @throws IOException
*/
- void doRegionCompactionPrep() throws IOException {
+ protected void doRegionCompactionPrep() throws IOException {
}
void triggerMajorCompaction() {
@@ -2162,7 +2163,7 @@ public class HRegion implements HeapSize
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
Mutation m = batchOp.operations[i].getFirst();
- Durability tmpDur = m.getDurability();
+ Durability tmpDur = m.getDurability();
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
}
@@ -2924,9 +2925,16 @@ public class HRegion implements HeapSize
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
- if (kv.matchingFamily(HLog.METAFAMILY) ||
+ if (kv.matchingFamily(WALEdit.METAFAMILY) ||
!Bytes.equals(key.getEncodedRegionName(),
this.getRegionInfo().getEncodedNameAsBytes())) {
+ //this is a special edit, we should handle it
+ CompactionDescriptor compaction = WALEdit.getCompaction(kv);
+ if (compaction != null) {
+ //replay the compaction
+ completeCompactionMarker(compaction);
+ }
+
skippedEdits++;
continue;
}
@@ -3001,6 +3009,23 @@ public class HRegion implements HeapSize
}
/**
+ * Call to complete a compaction. Its for the case where we find in the WAL a compaction
+ * that was not finished. We could find one recovering a WAL after a regionserver crash.
+ * See HBASE-2331.
+ * @param fs
+ * @param compaction
+ */
+ void completeCompactionMarker(CompactionDescriptor compaction)
+ throws IOException {
+ Store store = this.getStore(compaction.getFamilyName().toByteArray());
+ if (store == null) {
+ LOG.warn("Found Compaction WAL edit for deleted family:" + Bytes.toString(compaction.getFamilyName().toByteArray()));
+ return;
+ }
+ store.completeCompactionMarker(compaction);
+ }
+
+ /**
* Used by tests
* @param s Store to add edit too.
* @param kv KeyValue to add.
@@ -3472,10 +3497,10 @@ public class HRegion implements HeapSize
public long getMvccReadPoint() {
return this.readPt;
}
-
+
/**
* Reset both the filter and the old filter.
- *
+ *
* @throws IOException in case a filter raises an I/O exception.
*/
protected void resetFilters() throws IOException {
@@ -3684,7 +3709,7 @@ public class HRegion implements HeapSize
// If joinedHeap is pointing to some other row, try to seek to a correct one.
boolean mayHaveData =
(nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
- || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
+ || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
true, true)
&& joinedHeap.peek() != null
&& joinedHeap.peek().matchingRow(currentRow, offset, length));
@@ -4292,7 +4317,7 @@ public class HRegion implements HeapSize
LOG.debug("Files for region: " + b);
b.getRegionFileSystem().logFileSystemState(LOG);
}
-
+
RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
if (!rmt.prepare(null)) {
throw new IOException("Unable to merge regions " + a + " and " + b);
@@ -4318,7 +4343,7 @@ public class HRegion implements HeapSize
LOG.debug("Files for new region");
dstRegion.getRegionFileSystem().logFileSystemState(LOG);
}
-
+
if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
throw new IOException("Merged region " + dstRegion
+ " still has references after the compaction, is compaction canceled?");
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Fri Apr 26 22:12:49 2013
@@ -52,23 +52,23 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.exceptions.InvalidHFileException;
import org.apache.hadoop.hbase.exceptions.WrongRegionException;
-import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.exceptions.InvalidHFileException;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -123,6 +123,16 @@ public class HStore implements Store {
static int closeCheckInterval = 0;
private volatile long storeSize = 0L;
private volatile long totalUncompressedBytes = 0L;
+
+ /**
+ * RWLock for store operations.
+ * Locked in shared mode when the list of component stores is looked at:
+ * - all reads/writes to table data
+ * - checking for split
+ * Locked in exclusive mode when the list of component stores is modified:
+ * - closing
+ * - completing a compaction
+ */
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final boolean verifyBulkLoads;
@@ -388,14 +398,11 @@ public class HStore implements Store {
new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
int totalValidStoreFile = 0;
- final FileSystem fs = this.getFileSystem();
for (final StoreFileInfo storeFileInfo: files) {
// open each store file in parallel
completionService.submit(new Callable<StoreFile>() {
public StoreFile call() throws IOException {
- StoreFile storeFile = new StoreFile(fs, storeFileInfo.getPath(), conf, cacheConf,
- family.getBloomFilterType(), dataBlockEncoder);
- storeFile.createReader();
+ StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath());
return storeFile;
}
});
@@ -439,6 +446,17 @@ public class HStore implements Store {
return results;
}
+ private StoreFile createStoreFileAndReader(final Path p) throws IOException {
+ return createStoreFileAndReader(p, this.dataBlockEncoder);
+ }
+
+ private StoreFile createStoreFileAndReader(final Path p, final HFileDataBlockEncoder encoder) throws IOException {
+ StoreFile storeFile = new StoreFile(this.getFileSystem(), p, this.conf, this.cacheConf,
+ this.family.getBloomFilterType(), encoder);
+ storeFile.createReader();
+ return storeFile;
+ }
+
@Override
public long add(final KeyValue kv) {
lock.readLock().lock();
@@ -552,10 +570,9 @@ public class HStore implements Store {
Path srcPath = new Path(srcPathStr);
Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
- StoreFile sf = new StoreFile(this.getFileSystem(), dstPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
+ StoreFile sf = createStoreFileAndReader(dstPath);
- StoreFile.Reader r = sf.createReader();
+ StoreFile.Reader r = sf.getReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
@@ -715,10 +732,9 @@ public class HStore implements Store {
Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
status.setStatus("Flushing " + this + ": reopening flushed file");
- StoreFile sf = new StoreFile(this.getFileSystem(), dstPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
+ StoreFile sf = createStoreFileAndReader(dstPath);
- StoreFile.Reader r = sf.createReader();
+ StoreFile.Reader r = sf.getReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
@@ -877,6 +893,29 @@ public class HStore implements Store {
* <p>We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
+ * <p> Compaction event should be idempotent, since there is no IO Fencing for
+ * the region directory in hdfs. A region server might still try to complete the
+ * compaction after it lost the region. That is why the following events are carefully
+ * ordered for a compaction:
+ * 1. Compaction writes new files under region/.tmp directory (compaction output)
+ * 2. Compaction atomically moves the temporary file under region directory
+ * 3. Compaction appends a WAL edit containing the compaction input and output files.
+ * Forces sync on WAL.
+ * 4. Compaction deletes the input files from the region directory.
+ *
+ * Failure conditions are handled like this:
+ * - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
+ * the compaction later, it will only write the new data file to the region directory.
+ * Since we already have this data, this will be idempotent but we will have a redundant
+ * copy of the data.
+ * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
+ * RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
+ * - If RS fails after 3, the region region server who opens the region will pick up the
+ * the compaction marker from the WAL and replay it by removing the compaction input files.
+ * Failed RS can also attempt to delete those files, but the operation will be idempotent
+ *
+ * See HBASE-2231 for details.
+ *
* @param compaction compaction details obtained from requestCompaction()
* @throws IOException
* @return Storefile we compacted into or null if we failed or opted out early.
@@ -905,6 +944,13 @@ public class HStore implements Store {
List<Path> newFiles = compaction.compact();
// Move the compaction into place.
if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
+ //Write compaction to WAL
+ List<Path> inputPaths = new ArrayList<Path>();
+ for (StoreFile f : filesToCompact) {
+ inputPaths.add(f.getPath());
+ }
+
+ ArrayList<Path> outputPaths = new ArrayList(newFiles.size());
for (Path newFile: newFiles) {
assert newFile != null;
StoreFile sf = moveFileIntoPlace(newFile);
@@ -913,14 +959,21 @@ public class HStore implements Store {
}
assert sf != null;
sfs.add(sf);
+ outputPaths.add(sf.getPath());
+ }
+ if (region.getLog() != null) {
+ HRegionInfo info = this.region.getRegionInfo();
+ CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
+ family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
+
+ HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
+ this.region.getRegionInfo(), compactionDescriptor);
}
completeCompaction(filesToCompact, sfs);
} else {
for (Path newFile: newFiles) {
// Create storefile around what we wrote with a reader on it.
- StoreFile sf = new StoreFile(this.getFileSystem(), newFile, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- sf.createReader();
+ StoreFile sf = createStoreFileAndReader(newFile);
sfs.add(sf);
}
}
@@ -969,10 +1022,58 @@ public class HStore implements Store {
validateStoreFile(newFile);
// Move the file into the right spot
Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
- StoreFile result = new StoreFile(this.getFileSystem(), destPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- result.createReader();
- return result;
+ StoreFile sf = createStoreFileAndReader(destPath);
+
+ return sf;
+ }
+
+ /**
+ * Call to complete a compaction. Its for the case where we find in the WAL a compaction
+ * that was not finished. We could find one recovering a WAL after a regionserver crash.
+ * See HBASE-2331.
+ * @param compaction
+ */
+ public void completeCompactionMarker(CompactionDescriptor compaction)
+ throws IOException {
+ LOG.debug("Completing compaction from the WAL marker");
+ List<String> compactionInputs = compaction.getCompactionInputList();
+ List<String> compactionOutputs = compaction.getCompactionOutputList();
+
+ List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
+ for (String compactionOutput : compactionOutputs) {
+ //we should have this store file already
+ boolean found = false;
+ Path outputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionOutput);
+ outputPath = outputPath.makeQualified(fs.getFileSystem());
+ for (StoreFile sf : this.getStorefiles()) {
+ if (sf.getPath().makeQualified(sf.getPath().getFileSystem(conf)).equals(outputPath)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ if (getFileSystem().exists(outputPath)) {
+ outputStoreFiles.add(createStoreFileAndReader(outputPath));
+ }
+ }
+ }
+
+ List<Path> inputPaths = new ArrayList<Path>(compactionInputs.size());
+ for (String compactionInput : compactionInputs) {
+ Path inputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionInput);
+ inputPath = inputPath.makeQualified(fs.getFileSystem());
+ inputPaths.add(inputPath);
+ }
+
+ //some of the input files might already be deleted
+ List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
+ for (StoreFile sf : this.getStorefiles()) {
+ if (inputPaths.contains(sf.getPath().makeQualified(fs.getFileSystem()))) {
+ inputStoreFiles.add(sf);
+ }
+ }
+
+ this.completeCompaction(inputStoreFiles, outputStoreFiles);
}
/**
@@ -1179,10 +1280,7 @@ public class HStore implements Store {
throws IOException {
StoreFile storeFile = null;
try {
- storeFile = new StoreFile(this.getFileSystem(), path, this.conf,
- this.cacheConf, this.family.getBloomFilterType(),
- NoOpDataBlockEncoder.INSTANCE);
- storeFile.createReader();
+ createStoreFileAndReader(path, NoOpDataBlockEncoder.INSTANCE);
} catch (IOException e) {
LOG.error("Failed to open store file : " + path
+ ", keeping it in tmp location", e);
@@ -1213,7 +1311,7 @@ public class HStore implements Store {
* @return StoreFile created. May be null.
* @throws IOException
*/
- private void completeCompaction(final Collection<StoreFile> compactedFiles,
+ protected void completeCompaction(final Collection<StoreFile> compactedFiles,
final Collection<StoreFile> result) throws IOException {
try {
this.lock.writeLock().lock();
@@ -1238,6 +1336,9 @@ public class HStore implements Store {
// let the archive util decide if we should archive or delete the files
LOG.debug("Removing store files after compaction...");
+ for (StoreFile compactedFile : compactedFiles) {
+ compactedFile.closeReader(true);
+ }
this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
} catch (IOException e) {
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Apr 26 22:12:49 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.HeapSi
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -103,7 +104,7 @@ public interface Store extends HeapSize,
* This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
* across all of them.
* @param cells
- * @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @param readpoint readpoint below which we can safely remove duplicate KVs
* @return memstore size delta
* @throws IOException
*/
@@ -190,6 +191,15 @@ public interface Store extends HeapSize,
public StoreFlushContext createFlushContext(long cacheFlushId);
+ /**
+ * Call to complete a compaction. Its for the case where we find in the WAL a compaction
+ * that was not finished. We could find one recovering a WAL after a regionserver crash.
+ * See HBASE-2331.
+ * @param compaction
+ */
+ public void completeCompactionMarker(CompactionDescriptor compaction)
+ throws IOException;
+
// Split oriented methods
public boolean canSplit();
@@ -211,7 +221,7 @@ public interface Store extends HeapSize,
/**
* This method should only be called from HRegion. It is assumed that the ranges of values in the
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
- *
+ *
* @param srcPathStr
* @param sequenceId sequence Id associated with the HFile
*/
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Fri Apr 26 22:12:49 2013
@@ -27,7 +27,6 @@ import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.TreeMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -107,7 +106,7 @@ import org.apache.hadoop.util.StringUtil
@InterfaceAudience.Private
class FSHLog implements HLog, Syncable {
static final Log LOG = LogFactory.getLog(FSHLog.class);
-
+
private final FileSystem fs;
private final Path rootDir;
private final Path dir;
@@ -123,12 +122,11 @@ class FSHLog implements HLog, Syncable {
private long lastDeferredTxid;
private final Path oldLogDir;
private volatile boolean logRollRunning;
- private boolean failIfLogDirExists;
private WALCoprocessorHost coprocessorHost;
private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
- // Minimum tolerable replicas, if the actual value is lower than it,
+ // Minimum tolerable replicas, if the actual value is lower than it,
// rollWriter will be triggered
private int minTolerableReplication;
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
@@ -241,10 +239,10 @@ class FSHLog implements HLog, Syncable {
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final Configuration conf)
throws IOException {
- this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
+ this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
conf, null, true, null, false);
}
-
+
/**
* Constructor.
*
@@ -258,7 +256,7 @@ class FSHLog implements HLog, Syncable {
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final String oldLogDir, final Configuration conf)
throws IOException {
- this(fs, root, logDir, oldLogDir,
+ this(fs, root, logDir, oldLogDir,
conf, null, true, null, false);
}
@@ -284,7 +282,7 @@ class FSHLog implements HLog, Syncable {
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final Configuration conf, final List<WALActionsListener> listeners,
final String prefix) throws IOException {
- this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
+ this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
conf, listeners, true, prefix, false);
}
@@ -311,7 +309,7 @@ class FSHLog implements HLog, Syncable {
* @throws IOException
*/
public FSHLog(final FileSystem fs, final Path root, final String logDir,
- final String oldLogDir, final Configuration conf,
+ final String oldLogDir, final Configuration conf,
final List<WALActionsListener> listeners,
final boolean failIfLogDirExists, final String prefix, boolean forMeta)
throws IOException {
@@ -322,15 +320,13 @@ class FSHLog implements HLog, Syncable {
this.oldLogDir = new Path(this.rootDir, oldLogDir);
this.forMeta = forMeta;
this.conf = conf;
-
+
if (listeners != null) {
for (WALActionsListener i: listeners) {
registerWALActionsListener(i);
}
}
-
- this.failIfLogDirExists = failIfLogDirExists;
-
+
this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
getDefaultBlockSize());
// Roll at 95% of block size.
@@ -338,7 +334,7 @@ class FSHLog implements HLog, Syncable {
this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
-
+
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
this.minTolerableReplication = conf.getInt(
"hbase.regionserver.hlog.tolerable.lowreplication",
@@ -348,9 +344,9 @@ class FSHLog implements HLog, Syncable {
this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
this.closeErrorsTolerated = conf.getInt(
"hbase.regionserver.logroll.errors.tolerated", 0);
-
+
this.logSyncer = new LogSyncer(this.optionalFlushInterval);
-
+
LOG.info("HLog configuration: blocksize=" +
StringUtils.byteDesc(this.blocksize) +
", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
@@ -375,7 +371,7 @@ class FSHLog implements HLog, Syncable {
}
// rollWriter sets this.hdfs_out if it can.
rollWriter();
-
+
// handle the reflection necessary to call getNumCurrentReplicas()
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
@@ -392,7 +388,7 @@ class FSHLog implements HLog, Syncable {
this.metrics = new MetricsWAL();
}
-
+
// use reflection to search for getDefaultBlockSize(Path f)
// if the method doesn't exist, fall back to using getDefaultBlockSize()
private long getDefaultBlockSize() throws IOException {
@@ -485,7 +481,7 @@ class FSHLog implements HLog, Syncable {
* @return The wrapped stream our writer is using; its not the
* writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps
* (In hdfs its an instance of DFSDataOutputStream).
- *
+ *
* usage: see TestLogRolling.java
*/
OutputStream getOutputStream() {
@@ -576,7 +572,7 @@ class FSHLog implements HLog, Syncable {
/**
* This method allows subclasses to inject different writers without having to
* extend other methods like rollWriter().
- *
+ *
* @param fs
* @param path
* @param conf
@@ -773,28 +769,30 @@ class FSHLog implements HLog, Syncable {
close();
if (!fs.exists(this.dir)) return;
FileStatus[] files = fs.listStatus(this.dir);
- for(FileStatus file : files) {
+ if (files != null) {
+ for(FileStatus file : files) {
- Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
- // Tell our listeners that a log is going to be archived.
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.preLogArchive(file.getPath(), p);
+ Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
+ // Tell our listeners that a log is going to be archived.
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.preLogArchive(file.getPath(), p);
+ }
}
- }
- if (!fs.rename(file.getPath(),p)) {
- throw new IOException("Unable to rename " + file.getPath() + " to " + p);
- }
- // Tell our listeners that a log was archived.
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.postLogArchive(file.getPath(), p);
+ if (!fs.rename(file.getPath(),p)) {
+ throw new IOException("Unable to rename " + file.getPath() + " to " + p);
+ }
+ // Tell our listeners that a log was archived.
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.postLogArchive(file.getPath(), p);
+ }
}
}
+ LOG.debug("Moved " + files.length + " log files to " +
+ FSUtils.getPath(this.oldLogDir));
}
- LOG.debug("Moved " + files.length + " log files to " +
- FSUtils.getPath(this.oldLogDir));
if (!fs.delete(dir, true)) {
LOG.info("Unable to delete " + dir);
}
@@ -844,14 +842,15 @@ class FSHLog implements HLog, Syncable {
/**
* @param now
- * @param regionName
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tableName
* @param clusterId
* @return New log key.
*/
- protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum,
+ protected HLogKey makeKey(byte[] encodedRegionName, byte[] tableName, long seqnum,
long now, UUID clusterId) {
- return new HLogKey(regionName, tableName, seqnum, now, clusterId);
+ return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId);
}
@Override
@@ -953,7 +952,7 @@ class FSHLog implements HLog, Syncable {
}
// Sync if catalog region, and if not then check if that table supports
// deferred log flushing
- if (doSync &&
+ if (doSync &&
(info.isMetaRegion() ||
!htd.isDeferredLogFlush())) {
// sync txn to file system
@@ -963,14 +962,14 @@ class FSHLog implements HLog, Syncable {
}
@Override
- public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
+ public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
UUID clusterId, final long now, HTableDescriptor htd)
throws IOException {
return append(info, tableName, edits, clusterId, now, htd, false);
}
@Override
- public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
+ public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
UUID clusterId, final long now, HTableDescriptor htd)
throws IOException {
return append(info, tableName, edits, clusterId, now, htd, true);
@@ -992,8 +991,8 @@ class FSHLog implements HLog, Syncable {
// List of pending writes to the HLog. There corresponds to transactions
// that have not yet returned to the client. We keep them cached here
- // instead of writing them to HDFS piecemeal, because the HDFS write
- // method is pretty heavyweight as far as locking is concerned. The
+ // instead of writing them to HDFS piecemeal, because the HDFS write
+ // method is pretty heavyweight as far as locking is concerned. The
// goal is to increase the batchsize for writing-to-hdfs as well as
// sync-to-hdfs, so that we can get better system throughput.
private List<Entry> pendingWrites = new LinkedList<Entry>();
@@ -1088,7 +1087,7 @@ class FSHLog implements HLog, Syncable {
try {
long doneUpto;
long now = EnvironmentEdgeManager.currentTimeMillis();
- // First flush all the pending writes to HDFS. Then
+ // First flush all the pending writes to HDFS. Then
// issue the sync to HDFS. If sync is successful, then update
// syncedTillHere to indicate that transactions till this
// number has been successfully synced.
@@ -1114,7 +1113,7 @@ class FSHLog implements HLog, Syncable {
tempWriter = this.writer;
logSyncer.hlogFlush(tempWriter, pending);
}
- }
+ }
}
// another thread might have sync'ed avoid double-sync'ing
if (txid <= this.syncedTillHere) {
@@ -1251,6 +1250,7 @@ class FSHLog implements HLog, Syncable {
}
}
+ // TODO: Remove info. Unused.
protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
HTableDescriptor htd)
throws IOException {
@@ -1363,13 +1363,13 @@ class FSHLog implements HLog, Syncable {
/**
* Get the directory we are making logs in.
- *
+ *
* @return dir
*/
protected Path getDir() {
return dir;
}
-
+
static Path getHLogArchivePath(Path oldLogDir, Path p) {
return new Path(oldLogDir, p.getName());
}
@@ -1407,7 +1407,7 @@ class FSHLog implements HLog, Syncable {
conf, baseDir, p, oldLogDir, fs);
logSplitter.splitLog();
}
-
+
@Override
public WALCoprocessorHost getCoprocessorHost() {
return coprocessorHost;
@@ -1456,4 +1456,4 @@ class FSHLog implements HLog, Syncable {
System.exit(-1);
}
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Apr 26 22:12:49 2013
@@ -27,24 +27,20 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.exceptions.FailedLogCloseException;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.FailedLogCloseException;
+import org.apache.hadoop.io.Writable;
@InterfaceAudience.Private
public interface HLog {
public static final Log LOG = LogFactory.getLog(HLog.class);
- public static final byte[] METAFAMILY = Bytes.toBytes("METAFAMILY");
- static final byte[] METAROW = Bytes.toBytes("METAROW");
-
/** File Extension used while splitting an HLog into regions (HBASE-2312) */
public static final String SPLITTING_EXT = "-splitting";
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
@@ -96,7 +92,7 @@ public interface HLog {
/**
* Constructor for both params
- *
+ *
* @param edit
* log's edit
* @param key
@@ -110,7 +106,7 @@ public interface HLog {
/**
* Gets the edit
- *
+ *
* @return edit
*/
public WALEdit getEdit() {
@@ -119,7 +115,7 @@ public interface HLog {
/**
* Gets the key
- *
+ *
* @return key
*/
public HLogKey getKey() {
@@ -128,7 +124,7 @@ public interface HLog {
/**
* Set compression context for this entry.
- *
+ *
* @param compressionContext
* Compression context
*/
@@ -157,14 +153,14 @@ public interface HLog {
/**
* registers WALActionsListener
- *
+ *
* @param listener
*/
public void registerWALActionsListener(final WALActionsListener listener);
/**
* unregisters WALActionsListener
- *
+ *
* @param listener
*/
public boolean unregisterWALActionsListener(final WALActionsListener listener);
@@ -178,7 +174,7 @@ public interface HLog {
* Called by HRegionServer when it opens a new region to ensure that log
* sequence numbers are always greater than the latest sequence number of the
* region being brought on-line.
- *
+ *
* @param newvalue
* We'll set log edit/sequence number to this value if it is greater
* than the current value.
@@ -192,7 +188,7 @@ public interface HLog {
/**
* Roll the log writer. That is, start writing log messages to a new file.
- *
+ *
* <p>
* The implementation is synchronized in order to make sure there's one rollWriter
* running at any given time.
@@ -207,11 +203,11 @@ public interface HLog {
/**
* Roll the log writer. That is, start writing log messages to a new file.
- *
+ *
* <p>
* The implementation is synchronized in order to make sure there's one rollWriter
* running at any given time.
- *
+ *
* @param force
* If true, force creation of a new writer even if no entries have
* been written to the current writer
@@ -226,21 +222,21 @@ public interface HLog {
/**
* Shut down the log.
- *
+ *
* @throws IOException
*/
public void close() throws IOException;
/**
* Shut down the log and delete the log directory
- *
+ *
* @throws IOException
*/
public void closeAndDelete() throws IOException;
/**
* Append an entry to the log.
- *
+ *
* @param regionInfo
* @param logEdit
* @param logKey
@@ -254,7 +250,7 @@ public interface HLog {
/**
* Only used in tests.
- *
+ *
* @param info
* @param tableName
* @param edits
@@ -269,7 +265,7 @@ public interface HLog {
* Append a set of edits to the log. Log edits are keyed by (encoded)
* regionName, rowname, and log-sequence-id. The HLog is not flushed after
* this transaction is written to the log.
- *
+ *
* @param info
* @param tableName
* @param edits
@@ -286,7 +282,7 @@ public interface HLog {
* Append a set of edits to the log. Log edits are keyed by (encoded)
* regionName, rowname, and log-sequence-id. The HLog is flushed after this
* transaction is written to the log.
- *
+ *
* @param info
* @param tableName
* @param edits
@@ -351,7 +347,7 @@ public interface HLog {
/**
* Get LowReplication-Roller status
- *
+ *
* @return lowReplicationRollEnabled
*/
public boolean isLowReplicationRollEnabled();