You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/02/25 23:50:29 UTC

svn commit: r1449950 [22/35] - in /hbase/trunk: ./ hbase-client/ hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/ hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/ hbase-client/src/main/java/org/apache/hadoop/ h...

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,1141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Helper utility to build protocol buffer requests,
+ * or build components for protocol buffer requests.
+ */
+@InterfaceAudience.Private
+public final class RequestConverter {
+
+  private RequestConverter() {
+  }
+
+// Start utilities for Client
+
+/**
+   * Create a new protocol buffer GetRequest to get a row, all columns in a family.
+   * If there is no such row, return the closest row before it.
+   *
+   * @param regionName the name of the region to get
+   * @param row the row to get
+   * @param family the column family to get
+   * should return the immediate row before
+   * @return a protocol buffer GetReuqest
+   */
+  public static GetRequest buildGetRowOrBeforeRequest(
+      final byte[] regionName, final byte[] row, final byte[] family) {
+    GetRequest.Builder builder = GetRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setClosestRowBefore(true);
+    builder.setRegion(region);
+
+    Column.Builder columnBuilder = Column.newBuilder();
+    columnBuilder.setFamily(ByteString.copyFrom(family));
+    ClientProtos.Get.Builder getBuilder =
+      ClientProtos.Get.newBuilder();
+    getBuilder.setRow(ByteString.copyFrom(row));
+    getBuilder.addColumn(columnBuilder.build());
+    builder.setGet(getBuilder.build());
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer GetRequest for a client Get
+   *
+   * @param regionName the name of the region to get
+   * @param get the client Get
+   * @return a protocol buffer GetReuqest
+   */
+  public static GetRequest buildGetRequest(final byte[] regionName,
+      final Get get) throws IOException {
+    return buildGetRequest(regionName, get, false);
+  }
+
+  /**
+   * Create a protocol buffer GetRequest for a client Get
+   *
+   * @param regionName the name of the region to get
+   * @param get the client Get
+   * @param existenceOnly indicate if check row existence only
+   * @return a protocol buffer GetRequest
+   */
+  public static GetRequest buildGetRequest(final byte[] regionName,
+      final Get get, final boolean existenceOnly) throws IOException {
+    GetRequest.Builder builder = GetRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setExistenceOnly(existenceOnly);
+    builder.setRegion(region);
+    builder.setGet(ProtobufUtil.toGet(get));
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MultiGetRequest for client Gets All gets are going to be run against
+   * the same region.
+   * @param regionName the name of the region to get from
+   * @param gets the client Gets
+   * @param existenceOnly indicate if check rows existence only
+   * @return a protocol buffer MultiGetRequest
+   */
+  public static MultiGetRequest buildMultiGetRequest(final byte[] regionName, final List<Get> gets,
+      final boolean existenceOnly, final boolean closestRowBefore) throws IOException {
+    MultiGetRequest.Builder builder = MultiGetRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+    builder.setExistenceOnly(existenceOnly);
+    builder.setClosestRowBefore(closestRowBefore);
+    builder.setRegion(region);
+    for (Get get : gets) {
+      builder.addGet(ProtobufUtil.toGet(get));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MutateRequest for a client increment
+   *
+   * @param regionName
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param amount
+   * @param writeToWAL
+   * @return a mutate request
+   */
+  public static MutateRequest buildMutateRequest(
+      final byte[] regionName, final byte[] row, final byte[] family,
+      final byte [] qualifier, final long amount, final boolean writeToWAL) {
+    MutateRequest.Builder builder = MutateRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+
+    Mutate.Builder mutateBuilder = Mutate.newBuilder();
+    mutateBuilder.setRow(ByteString.copyFrom(row));
+    mutateBuilder.setMutateType(MutateType.INCREMENT);
+    mutateBuilder.setWriteToWAL(writeToWAL);
+    ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
+    columnBuilder.setFamily(ByteString.copyFrom(family));
+    QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
+    valueBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(amount)));
+    valueBuilder.setQualifier(ByteString.copyFrom(qualifier));
+    columnBuilder.addQualifierValue(valueBuilder.build());
+    mutateBuilder.addColumnValue(columnBuilder.build());
+
+    builder.setMutate(mutateBuilder.build());
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MutateRequest for a conditioned put
+   *
+   * @param regionName
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param comparator
+   * @param compareType
+   * @param put
+   * @return a mutate request
+   * @throws IOException
+   */
+  public static MutateRequest buildMutateRequest(
+      final byte[] regionName, final byte[] row, final byte[] family,
+      final byte [] qualifier, final ByteArrayComparable comparator,
+      final CompareType compareType, final Put put) throws IOException {
+    MutateRequest.Builder builder = MutateRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    Condition condition = buildCondition(
+      row, family, qualifier, comparator, compareType);
+    builder.setMutate(ProtobufUtil.toMutate(MutateType.PUT, put));
+    builder.setCondition(condition);
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MutateRequest for a conditioned delete
+   *
+   * @param regionName
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param comparator
+   * @param compareType
+   * @param delete
+   * @return a mutate request
+   * @throws IOException
+   */
+  public static MutateRequest buildMutateRequest(
+      final byte[] regionName, final byte[] row, final byte[] family,
+      final byte [] qualifier, final ByteArrayComparable comparator,
+      final CompareType compareType, final Delete delete) throws IOException {
+    MutateRequest.Builder builder = MutateRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    Condition condition = buildCondition(
+      row, family, qualifier, comparator, compareType);
+    builder.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, delete));
+    builder.setCondition(condition);
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MutateRequest for a put
+   *
+   * @param regionName
+   * @param put
+   * @return a mutate request
+   * @throws IOException
+   */
+  public static MutateRequest buildMutateRequest(
+      final byte[] regionName, final Put put) throws IOException {
+    MutateRequest.Builder builder = MutateRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    builder.setMutate(ProtobufUtil.toMutate(MutateType.PUT, put));
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MutateRequest for an append
+   *
+   * @param regionName
+   * @param append
+   * @return a mutate request
+   * @throws IOException
+   */
+  public static MutateRequest buildMutateRequest(
+      final byte[] regionName, final Append append) throws IOException {
+    MutateRequest.Builder builder = MutateRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    builder.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, append));
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MutateRequest for a client increment
+   *
+   * @param regionName
+   * @param increment
+   * @return a mutate request
+   */
+  public static MutateRequest buildMutateRequest(
+      final byte[] regionName, final Increment increment) {
+    MutateRequest.Builder builder = MutateRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    builder.setMutate(ProtobufUtil.toMutate(increment));
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MutateRequest for a delete
+   *
+   * @param regionName
+   * @param delete
+   * @return a mutate request
+   * @throws IOException
+   */
+  public static MutateRequest buildMutateRequest(
+      final byte[] regionName, final Delete delete) throws IOException {
+    MutateRequest.Builder builder = MutateRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    builder.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, delete));
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MultiRequest for a row mutations
+   *
+   * @param regionName
+   * @param rowMutations
+   * @return a multi request
+   * @throws IOException
+   */
+  public static MultiRequest buildMultiRequest(final byte[] regionName,
+      final RowMutations rowMutations) throws IOException {
+    MultiRequest.Builder builder = MultiRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    builder.setAtomic(true);
+    for (Mutation mutation: rowMutations.getMutations()) {
+      MutateType mutateType = null;
+      if (mutation instanceof Put) {
+        mutateType = MutateType.PUT;
+      } else if (mutation instanceof Delete) {
+        mutateType = MutateType.DELETE;
+      } else {
+        throw new DoNotRetryIOException(
+          "RowMutations supports only put and delete, not "
+            + mutation.getClass().getName());
+      }
+      Mutate mutate = ProtobufUtil.toMutate(mutateType, mutation);
+      builder.addAction(MultiAction.newBuilder().setMutate(mutate).build());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer ScanRequest for a client Scan
+   *
+   * @param regionName
+   * @param scan
+   * @param numberOfRows
+   * @param closeScanner
+   * @return a scan request
+   * @throws IOException
+   */
+  public static ScanRequest buildScanRequest(final byte[] regionName,
+      final Scan scan, final int numberOfRows,
+        final boolean closeScanner) throws IOException {
+    ScanRequest.Builder builder = ScanRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setNumberOfRows(numberOfRows);
+    builder.setCloseScanner(closeScanner);
+    builder.setRegion(region);
+    builder.setScan(ProtobufUtil.toScan(scan));
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer ScanRequest for a scanner id
+   *
+   * @param scannerId
+   * @param numberOfRows
+   * @param closeScanner
+   * @return a scan request
+   */
+  public static ScanRequest buildScanRequest(final long scannerId,
+      final int numberOfRows, final boolean closeScanner) {
+    ScanRequest.Builder builder = ScanRequest.newBuilder();
+    builder.setNumberOfRows(numberOfRows);
+    builder.setCloseScanner(closeScanner);
+    builder.setScannerId(scannerId);
+    return builder.build();
+  }
+  
+  /**
+   * Create a protocol buffer ScanRequest for a scanner id
+   * 
+   * @param scannerId
+   * @param numberOfRows
+   * @param closeScanner
+   * @param nextCallSeq
+   * @return a scan request
+   */
+  public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
+      final boolean closeScanner, final long nextCallSeq) {
+    ScanRequest.Builder builder = ScanRequest.newBuilder();
+    builder.setNumberOfRows(numberOfRows);
+    builder.setCloseScanner(closeScanner);
+    builder.setScannerId(scannerId);
+    builder.setNextCallSeq(nextCallSeq);
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer bulk load request
+   *
+   * @param familyPaths
+   * @param regionName
+   * @param assignSeqNum
+   * @return a bulk load request
+   */
+  public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
+      final List<Pair<byte[], String>> familyPaths,
+      final byte[] regionName, boolean assignSeqNum) {
+    BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    FamilyPath.Builder familyPathBuilder = FamilyPath.newBuilder();
+    for (Pair<byte[], String> familyPath: familyPaths) {
+      familyPathBuilder.setFamily(ByteString.copyFrom(familyPath.getFirst()));
+      familyPathBuilder.setPath(familyPath.getSecond());
+      builder.addFamilyPath(familyPathBuilder.build());
+    }
+    builder.setAssignSeqNum(assignSeqNum);
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer multi request for a list of actions.
+   * RowMutations in the list (if any) will be ignored.
+   *
+   * @param regionName
+   * @param actions
+   * @return a multi request
+   * @throws IOException
+   */
+  public static <R> MultiRequest buildMultiRequest(final byte[] regionName,
+      final List<Action<R>> actions) throws IOException {
+    MultiRequest.Builder builder = MultiRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    for (Action<R> action: actions) {
+      MultiAction.Builder protoAction = MultiAction.newBuilder();
+
+      Row row = action.getAction();
+      if (row instanceof Get) {
+        protoAction.setGet(ProtobufUtil.toGet((Get)row));
+      } else if (row instanceof Put) {
+        protoAction.setMutate(ProtobufUtil.toMutate(MutateType.PUT, (Put)row));
+      } else if (row instanceof Delete) {
+        protoAction.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, (Delete)row));
+      } else if (row instanceof Append) {
+        protoAction.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, (Append)row));
+      } else if (row instanceof Increment) {
+        protoAction.setMutate(ProtobufUtil.toMutate((Increment)row));
+      } else if (row instanceof RowMutations) {
+        continue; // ignore RowMutations
+      } else {
+        throw new DoNotRetryIOException(
+          "multi doesn't support " + row.getClass().getName());
+      }
+      builder.addAction(protoAction.build());
+    }
+    return builder.build();
+  }
+
+// End utilities for Client
+//Start utilities for Admin
+
+  /**
+   * Create a protocol buffer GetRegionInfoRequest for a given region name
+   *
+   * @param regionName the name of the region to get info
+   * @return a protocol buffer GetRegionInfoRequest
+   */
+  public static GetRegionInfoRequest
+      buildGetRegionInfoRequest(final byte[] regionName) {
+    return buildGetRegionInfoRequest(regionName, false);
+  }
+
+  /**
+   * Create a protocol buffer GetRegionInfoRequest for a given region name
+   *
+   * @param regionName the name of the region to get info
+   * @param includeCompactionState indicate if the compaction state is requested
+   * @return a protocol buffer GetRegionInfoRequest
+   */
+  public static GetRegionInfoRequest
+      buildGetRegionInfoRequest(final byte[] regionName,
+        final boolean includeCompactionState) {
+    GetRegionInfoRequest.Builder builder = GetRegionInfoRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    if (includeCompactionState) {
+      builder.setCompactionState(includeCompactionState);
+    }
+    return builder.build();
+  }
+
+ /**
+  * Create a protocol buffer GetStoreFileRequest for a given region name
+  *
+  * @param regionName the name of the region to get info
+  * @param family the family to get store file list
+  * @return a protocol buffer GetStoreFileRequest
+  */
+ public static GetStoreFileRequest
+     buildGetStoreFileRequest(final byte[] regionName, final byte[] family) {
+   GetStoreFileRequest.Builder builder = GetStoreFileRequest.newBuilder();
+   RegionSpecifier region = buildRegionSpecifier(
+     RegionSpecifierType.REGION_NAME, regionName);
+   builder.setRegion(region);
+   builder.addFamily(ByteString.copyFrom(family));
+   return builder.build();
+ }
+
+ /**
+  * Create a protocol buffer GetOnlineRegionRequest
+  *
+  * @return a protocol buffer GetOnlineRegionRequest
+  */
+ public static GetOnlineRegionRequest buildGetOnlineRegionRequest() {
+   return GetOnlineRegionRequest.newBuilder().build();
+ }
+
+ /**
+  * Create a protocol buffer FlushRegionRequest for a given region name
+  *
+  * @param regionName the name of the region to get info
+  * @return a protocol buffer FlushRegionRequest
+  */
+ public static FlushRegionRequest
+     buildFlushRegionRequest(final byte[] regionName) {
+   FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder();
+   RegionSpecifier region = buildRegionSpecifier(
+     RegionSpecifierType.REGION_NAME, regionName);
+   builder.setRegion(region);
+   return builder.build();
+ }
+
+ /**
+  * Create a protocol buffer OpenRegionRequest to open a list of regions
+  *
+  * @param regionOpenInfos info of a list of regions to open
+  * @return a protocol buffer OpenRegionRequest
+  */
+ public static OpenRegionRequest
+     buildOpenRegionRequest(final List<Pair<HRegionInfo, Integer>> regionOpenInfos) {
+   OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
+   for (Pair<HRegionInfo, Integer> regionOpenInfo: regionOpenInfos) {
+     Integer second = regionOpenInfo.getSecond();
+     int versionOfOfflineNode = second == null ? -1 : second.intValue();
+     builder.addOpenInfo(buildRegionOpenInfo(
+       regionOpenInfo.getFirst(), versionOfOfflineNode));
+   }
+   return builder.build();
+ }
+
+ /**
+  * Create a protocol buffer OpenRegionRequest for a given region
+  *
+  * @param region the region to open
+  * @param versionOfOfflineNode that needs to be present in the offline node
+  * @return a protocol buffer OpenRegionRequest
+  */
+ public static OpenRegionRequest buildOpenRegionRequest(
+     final HRegionInfo region, final int versionOfOfflineNode) {
+   OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
+   builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode));
+   return builder.build();
+ }
+
+ /**
+  * Create a CloseRegionRequest for a given region name
+  *
+  * @param regionName the name of the region to close
+  * @param transitionInZK indicator if to transition in ZK
+  * @return a CloseRegionRequest
+  */
+ public static CloseRegionRequest buildCloseRegionRequest(
+     final byte[] regionName, final boolean transitionInZK) {
+   CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
+   RegionSpecifier region = buildRegionSpecifier(
+     RegionSpecifierType.REGION_NAME, regionName);
+   builder.setRegion(region);
+   builder.setTransitionInZK(transitionInZK);
+   return builder.build();
+ }
+
+  public static CloseRegionRequest buildCloseRegionRequest(
+    final byte[] regionName, final int versionOfClosingNode,
+    ServerName destinationServer, final boolean transitionInZK) {
+    CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(
+      RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    builder.setVersionOfClosingNode(versionOfClosingNode);
+    builder.setTransitionInZK(transitionInZK);
+    if (destinationServer != null){
+      builder.setDestinationServer(ProtobufUtil.toServerName( destinationServer) );
+    }
+    return builder.build();
+  }
+
+ /**
+  * Create a CloseRegionRequest for a given encoded region name
+  *
+  * @param encodedRegionName the name of the region to close
+  * @param transitionInZK indicator if to transition in ZK
+  * @return a CloseRegionRequest
+  */
+ public static CloseRegionRequest
+     buildCloseRegionRequest(final String encodedRegionName,
+       final boolean transitionInZK) {
+   CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
+   RegionSpecifier region = buildRegionSpecifier(
+     RegionSpecifierType.ENCODED_REGION_NAME,
+     Bytes.toBytes(encodedRegionName));
+   builder.setRegion(region);
+   builder.setTransitionInZK(transitionInZK);
+   return builder.build();
+ }
+
+ /**
+  * Create a SplitRegionRequest for a given region name
+  *
+  * @param regionName the name of the region to split
+  * @param splitPoint the split point
+  * @return a SplitRegionRequest
+  */
+ public static SplitRegionRequest buildSplitRegionRequest(
+     final byte[] regionName, final byte[] splitPoint) {
+   SplitRegionRequest.Builder builder = SplitRegionRequest.newBuilder();
+   RegionSpecifier region = buildRegionSpecifier(
+     RegionSpecifierType.REGION_NAME, regionName);
+   builder.setRegion(region);
+   if (splitPoint != null) {
+     builder.setSplitPoint(ByteString.copyFrom(splitPoint));
+   }
+   return builder.build();
+ }
+
+ /**
+  * Create a  CompactRegionRequest for a given region name
+  *
+  * @param regionName the name of the region to get info
+  * @param major indicator if it is a major compaction
+  * @return a CompactRegionRequest
+  */
+ public static CompactRegionRequest buildCompactRegionRequest(
+     final byte[] regionName, final boolean major, final byte [] family) {
+   CompactRegionRequest.Builder builder = CompactRegionRequest.newBuilder();
+   RegionSpecifier region = buildRegionSpecifier(
+     RegionSpecifierType.REGION_NAME, regionName);
+   builder.setRegion(region);
+   builder.setMajor(major);
+   if (family != null) {
+     builder.setFamily(ByteString.copyFrom(family));
+   }
+   return builder.build();
+ }
+
+  /**
+  * Create a new RollWALWriterRequest
+  *
+  * @return a ReplicateWALEntryRequest
+  */
+ public static RollWALWriterRequest buildRollWALWriterRequest() {
+   RollWALWriterRequest.Builder builder = RollWALWriterRequest.newBuilder();
+   return builder.build();
+ }
+
+ /**
+  * Create a new GetServerInfoRequest
+  *
+  * @return a GetServerInfoRequest
+  */
+ public static GetServerInfoRequest buildGetServerInfoRequest() {
+   GetServerInfoRequest.Builder builder =  GetServerInfoRequest.newBuilder();
+   return builder.build();
+ }
+
+ /**
+  * Create a new StopServerRequest
+  *
+  * @param reason the reason to stop the server
+  * @return a StopServerRequest
+  */
+ public static StopServerRequest buildStopServerRequest(final String reason) {
+   StopServerRequest.Builder builder = StopServerRequest.newBuilder();
+   builder.setReason(reason);
+   return builder.build();
+ }
+
+//End utilities for Admin
+
+  /**
+   * Convert a byte array to a protocol buffer RegionSpecifier
+   *
+   * @param type the region specifier type
+   * @param value the region specifier byte array value
+   * @return a protocol buffer RegionSpecifier
+   */
+  public static RegionSpecifier buildRegionSpecifier(
+      final RegionSpecifierType type, final byte[] value) {
+    RegionSpecifier.Builder regionBuilder = RegionSpecifier.newBuilder();
+    regionBuilder.setValue(ByteString.copyFrom(value));
+    regionBuilder.setType(type);
+    return regionBuilder.build();
+  }
+
+  /**
+   * Create a protocol buffer Condition
+   *
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param comparator
+   * @param compareType
+   * @return a Condition
+   * @throws IOException
+   */
+  private static Condition buildCondition(final byte[] row,
+      final byte[] family, final byte [] qualifier,
+      final ByteArrayComparable comparator,
+      final CompareType compareType) throws IOException {
+    Condition.Builder builder = Condition.newBuilder();
+    builder.setRow(ByteString.copyFrom(row));
+    builder.setFamily(ByteString.copyFrom(family));
+    builder.setQualifier(ByteString.copyFrom(qualifier));
+    builder.setComparator(ProtobufUtil.toComparator(comparator));
+    builder.setCompareType(compareType);
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer AddColumnRequest
+   *
+   * @param tableName
+   * @param column
+   * @return an AddColumnRequest
+   */
+  public static AddColumnRequest buildAddColumnRequest(
+      final byte [] tableName, final HColumnDescriptor column) {
+    AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
+    builder.setTableName(ByteString.copyFrom(tableName));
+    builder.setColumnFamilies(column.convert());
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer DeleteColumnRequest
+   *
+   * @param tableName
+   * @param columnName
+   * @return a DeleteColumnRequest
+   */
+  public static DeleteColumnRequest buildDeleteColumnRequest(
+      final byte [] tableName, final byte [] columnName) {
+    DeleteColumnRequest.Builder builder = DeleteColumnRequest.newBuilder();
+    builder.setTableName(ByteString.copyFrom(tableName));
+    builder.setColumnName(ByteString.copyFrom(columnName));
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer ModifyColumnRequest
+   *
+   * @param tableName
+   * @param column
+   * @return an ModifyColumnRequest
+   */
+  public static ModifyColumnRequest buildModifyColumnRequest(
+      final byte [] tableName, final HColumnDescriptor column) {
+    ModifyColumnRequest.Builder builder = ModifyColumnRequest.newBuilder();
+    builder.setTableName(ByteString.copyFrom(tableName));
+    builder.setColumnFamilies(column.convert());
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MoveRegionRequest
+   *
+   * @param encodedRegionName
+   * @param destServerName
+   * @return A MoveRegionRequest
+   * @throws DeserializationException
+   */
+  public static MoveRegionRequest buildMoveRegionRequest(
+      final byte [] encodedRegionName, final byte [] destServerName) throws
+      DeserializationException {
+	MoveRegionRequest.Builder builder = MoveRegionRequest.newBuilder();
+    builder.setRegion(
+      buildRegionSpecifier(RegionSpecifierType.ENCODED_REGION_NAME,encodedRegionName));
+    if (destServerName != null) {
+      builder.setDestServerName(
+        ProtobufUtil.toServerName(new ServerName(Bytes.toString(destServerName))));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer AssignRegionRequest
+   *
+   * @param regionName
+   * @return an AssignRegionRequest
+   */
+  public static AssignRegionRequest buildAssignRegionRequest(final byte [] regionName) {
+    AssignRegionRequest.Builder builder = AssignRegionRequest.newBuilder();
+    builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME,regionName));
+    return builder.build();
+  }
+
+  /**
+   * Creates a protocol buffer UnassignRegionRequest
+   *
+   * @param regionName
+   * @param force
+   * @return an UnassignRegionRequest
+   */
+  public static UnassignRegionRequest buildUnassignRegionRequest(
+      final byte [] regionName, final boolean force) {
+    UnassignRegionRequest.Builder builder = UnassignRegionRequest.newBuilder();
+    builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME,regionName));
+    builder.setForce(force);
+    return builder.build();
+  }
+
+  /**
+   * Creates a protocol buffer OfflineRegionRequest
+   *
+   * @param regionName
+   * @return an OfflineRegionRequest
+   */
+  public static OfflineRegionRequest buildOfflineRegionRequest(final byte [] regionName) {
+    OfflineRegionRequest.Builder builder = OfflineRegionRequest.newBuilder();
+    builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME,regionName));
+    return builder.build();
+  }
+
+  /**
+   * Creates a protocol buffer DeleteTableRequest
+   *
+   * @param tableName
+   * @return a DeleteTableRequest
+   */
+  public static DeleteTableRequest buildDeleteTableRequest(final byte [] tableName) {
+    DeleteTableRequest.Builder builder = DeleteTableRequest.newBuilder();
+    builder.setTableName(ByteString.copyFrom(tableName));
+    return builder.build();
+  }
+
+  /**
+   * Creates a protocol buffer EnableTableRequest
+   *
+   * @param tableName
+   * @return an EnableTableRequest
+   */
+  public static EnableTableRequest buildEnableTableRequest(final byte [] tableName) {
+    EnableTableRequest.Builder builder = EnableTableRequest.newBuilder();
+    builder.setTableName(ByteString.copyFrom(tableName));
+    return builder.build();
+  }
+
+  /**
+   * Creates a protocol buffer DisableTableRequest
+   *
+   * @param tableName
+   * @return a DisableTableRequest
+   */
+  public static DisableTableRequest buildDisableTableRequest(final byte [] tableName) {
+    DisableTableRequest.Builder builder = DisableTableRequest.newBuilder();
+    builder.setTableName(ByteString.copyFrom(tableName));
+    return builder.build();
+  }
+
+  /**
+   * Creates a protocol buffer CreateTableRequest
+   *
+   * @param hTableDesc
+   * @param splitKeys
+   * @return a CreateTableRequest
+   */
+  public static CreateTableRequest buildCreateTableRequest(
+      final HTableDescriptor hTableDesc, final byte [][] splitKeys) {
+    CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
+    builder.setTableSchema(hTableDesc.convert());
+    if (splitKeys != null) {
+      for (byte [] splitKey : splitKeys) {
+        builder.addSplitKeys(ByteString.copyFrom(splitKey));
+      }
+    }
+    return builder.build();
+  }
+
+
+  /**
+   * Creates a protocol buffer ModifyTableRequest
+   *
+   * @param table
+   * @param hTableDesc
+   * @return a ModifyTableRequest
+   */
+  public static ModifyTableRequest buildModifyTableRequest(
+      final byte [] table, final HTableDescriptor hTableDesc) {
+    ModifyTableRequest.Builder builder = ModifyTableRequest.newBuilder();
+    builder.setTableName(ByteString.copyFrom(table));
+    builder.setTableSchema(hTableDesc.convert());
+    return builder.build();
+  }
+
+  /**
+   * Creates a protocol buffer GetSchemaAlterStatusRequest
+   *
+   * @param tableName
+   * @return a GetSchemaAlterStatusRequest
+   */
+  public static GetSchemaAlterStatusRequest buildGetSchemaAlterStatusRequest(
+      final byte [] tableName) {
+    GetSchemaAlterStatusRequest.Builder builder = GetSchemaAlterStatusRequest.newBuilder();
+    builder.setTableName(ByteString.copyFrom(tableName));
+    return builder.build();
+  }
+
+  /**
+   * Creates a protocol buffer GetTableDescriptorsRequest
+   *
+   * @param tableNames
+   * @return a GetTableDescriptorsRequest
+   */
+  public static GetTableDescriptorsRequest buildGetTableDescriptorsRequest(
+      final List<String> tableNames) {
+    GetTableDescriptorsRequest.Builder builder = GetTableDescriptorsRequest.newBuilder();
+    if (tableNames != null) {
+      for (String str : tableNames) {
+        builder.addTableNames(str);
+      }
+    }
+    return builder.build();
+  }
+
+  /**
+   * Creates a protocol buffer IsMasterRunningRequest
+   *
+   * @return a IsMasterRunningRequest
+   */
+  public static IsMasterRunningRequest buildIsMasterRunningRequest() {
+    return IsMasterRunningRequest.newBuilder().build();
+  }
+
+  /**
+   * Creates a protocol buffer BalanceRequest
+   *
+   * @return a BalanceRequest
+   */
+  public static BalanceRequest buildBalanceRequest() {
+    return BalanceRequest.newBuilder().build();
+  }
+
+  /**
+   * Creates a protocol buffer SetBalancerRunningRequest
+   *
+   * @param on
+   * @param synchronous
+   * @return a SetBalancerRunningRequest
+   */
+  public static SetBalancerRunningRequest buildSetBalancerRunningRequest(boolean on, boolean synchronous) {
+    return SetBalancerRunningRequest.newBuilder().setOn(on).setSynchronous(synchronous).build();
+  }
+
+  /**
+   * Creates a protocol buffer GetClusterStatusRequest
+   *
+   * @return A GetClusterStatusRequest
+   */
+  public static GetClusterStatusRequest buildGetClusterStatusRequest() {
+    return GetClusterStatusRequest.newBuilder().build();
+  }
+
+  /**
+   * Creates a request for running a catalog scan
+   * @return A {@link CatalogScanRequest}
+   */
+  public static CatalogScanRequest buildCatalogScanRequest() {
+    return CatalogScanRequest.newBuilder().build();
+  }
+
+  /**
+   * Creates a request for enabling/disabling the catalog janitor
+   * @return A {@link EnableCatalogJanitorRequest}
+   */
+  public static EnableCatalogJanitorRequest buildEnableCatalogJanitorRequest(boolean enable) {
+    return EnableCatalogJanitorRequest.newBuilder().setEnable(enable).build();
+  }
+
+  /**
+   * Creates a request for querying the master whether the catalog janitor is enabled
+   * @return A {@link IsCatalogJanitorEnabledRequest}
+   */
+  public static IsCatalogJanitorEnabledRequest buildIsCatalogJanitorEnabledRequest() {
+    return IsCatalogJanitorEnabledRequest.newBuilder().build();
+  }
+
+  /**
+   * Creates a request for querying the master the last flushed sequence Id for a region
+   * @param regionName
+   * @return A {@link GetLastFlushedSequenceIdRequest}
+   */
+  public static GetLastFlushedSequenceIdRequest buildGetLastFlushedSequenceIdRequest(
+      byte[] regionName) {
+    return GetLastFlushedSequenceIdRequest.newBuilder().setRegionName(
+        ByteString.copyFrom(regionName)).build();
+  }
+
+  /**
+   * Create a request to grant user permissions.
+   *
+   * @param username the short user name who to grant permissions
+   * @param table optional table name the permissions apply
+   * @param family optional column family
+   * @param qualifier optional qualifier
+   * @param actions the permissions to be granted
+   * @return A {@link AccessControlProtos} GrantRequest
+   */
+  public static AccessControlProtos.GrantRequest buildGrantRequest(
+      String username, byte[] table, byte[] family, byte[] qualifier,
+      AccessControlProtos.Permission.Action... actions) {
+    AccessControlProtos.Permission.Builder permissionBuilder =
+        AccessControlProtos.Permission.newBuilder();
+    for (AccessControlProtos.Permission.Action a : actions) {
+      permissionBuilder.addAction(a);
+    }
+    if (table != null) {
+      permissionBuilder.setTable(ByteString.copyFrom(table));
+    }
+    if (family != null) {
+      permissionBuilder.setFamily(ByteString.copyFrom(family));
+    }
+    if (qualifier != null) {
+      permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
+    }
+
+    return AccessControlProtos.GrantRequest.newBuilder()
+      .setPermission(
+          AccessControlProtos.UserPermission.newBuilder()
+              .setUser(ByteString.copyFromUtf8(username))
+              .setPermission(permissionBuilder.build())
+      ).build();
+  }
+
+  /**
+   * Create a request to revoke user permissions.
+   *
+   * @param username the short user name whose permissions to be revoked
+   * @param table optional table name the permissions apply
+   * @param family optional column family
+   * @param qualifier optional qualifier
+   * @param actions the permissions to be revoked
+   * @return A {@link AccessControlProtos} RevokeRequest
+   */
+  public static AccessControlProtos.RevokeRequest buildRevokeRequest(
+      String username, byte[] table, byte[] family, byte[] qualifier,
+      AccessControlProtos.Permission.Action... actions) {
+    AccessControlProtos.Permission.Builder permissionBuilder =
+        AccessControlProtos.Permission.newBuilder();
+    for (AccessControlProtos.Permission.Action a : actions) {
+      permissionBuilder.addAction(a);
+    }
+    if (table != null) {
+      permissionBuilder.setTable(ByteString.copyFrom(table));
+    }
+    if (family != null) {
+      permissionBuilder.setFamily(ByteString.copyFrom(family));
+    }
+    if (qualifier != null) {
+      permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
+    }
+
+    return AccessControlProtos.RevokeRequest.newBuilder()
+      .setPermission(
+          AccessControlProtos.UserPermission.newBuilder()
+              .setUser(ByteString.copyFromUtf8(username))
+              .setPermission(permissionBuilder.build())
+      ).build();
+  }
+
+  /**
+   * Create a RegionOpenInfo based on given region info and version of offline node
+   */
+  private static RegionOpenInfo buildRegionOpenInfo(
+      final HRegionInfo region, final int versionOfOfflineNode) {
+    RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();
+    builder.setRegion(HRegionInfo.convert(region));
+    if (versionOfOfflineNode >= 0) {
+      builder.setVersionOfOfflineNode(versionOfOfflineNode);
+    }
+    return builder.build();
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.UserPermissionsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility to build protocol buffer responses,
+ * or retrieve data from protocol buffer responses.
+ */
+@InterfaceAudience.Private
+public final class ResponseConverter {
+
+  private ResponseConverter() {
+  }
+
+// Start utilities for Client
+
+  /**
+   * Get the client Results from a protocol buffer ScanResponse
+   *
+   * @param response the protocol buffer ScanResponse
+   * @return the client Results in the response
+   */
+  public static Result[] getResults(final ScanResponse response) {
+    if (response == null) return null;
+    int count = response.getResultCount();
+    Result[] results = new Result[count];
+    for (int i = 0; i < count; i++) {
+      results[i] = ProtobufUtil.toResult(response.getResult(i));
+    }
+    return results;
+  }
+
+  /**
+   * Get the results from a protocol buffer MultiResponse
+   *
+   * @param proto the protocol buffer MultiResponse to convert
+   * @return the results in the MultiResponse
+   * @throws IOException
+   */
+  public static List<Object> getResults(
+      final ClientProtos.MultiResponse proto) throws IOException {
+    List<Object> results = new ArrayList<Object>();
+    List<ActionResult> resultList = proto.getResultList();
+    for (int i = 0, n = resultList.size(); i < n; i++) {
+      ActionResult result = resultList.get(i);
+      if (result.hasException()) {
+        results.add(ProtobufUtil.toException(result.getException()));
+      } else if (result.hasValue()) {
+        ClientProtos.Result r = result.getValue();
+        Object value = ProtobufUtil.toResult(r);
+        if (value instanceof ClientProtos.Result) {
+          results.add(ProtobufUtil.toResult((ClientProtos.Result)value));
+        } else {
+          results.add(value);
+        }
+      } else {
+        results.add(new Result());
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Wrap a throwable to an action result.
+   *
+   * @param t
+   * @return an action result
+   */
+  public static ActionResult buildActionResult(final Throwable t) {
+    ActionResult.Builder builder = ActionResult.newBuilder();
+    NameBytesPair.Builder parameterBuilder = NameBytesPair.newBuilder();
+    parameterBuilder.setName(t.getClass().getName());
+    parameterBuilder.setValue(
+      ByteString.copyFromUtf8(StringUtils.stringifyException(t)));
+    builder.setException(parameterBuilder.build());
+    return builder.build();
+  }
+
+  /**
+   * Converts the permissions list into a protocol buffer UserPermissionsResponse
+   */
+  public static UserPermissionsResponse buildUserPermissionsResponse(
+      final List<UserPermission> permissions) {
+    UserPermissionsResponse.Builder builder = UserPermissionsResponse.newBuilder();
+    for (UserPermission perm : permissions) {
+      builder.addPermission(ProtobufUtil.toUserPermission(perm));
+    }
+    return builder.build();
+  }
+
+// End utilities for Client
+// Start utilities for Admin
+
+  /**
+   * Get the list of regions to flush from a RollLogWriterResponse
+   *
+   * @param proto the RollLogWriterResponse
+   * @return the the list of regions to flush
+   */
+  public static byte[][] getRegions(final RollWALWriterResponse proto) {
+    if (proto == null || proto.getRegionToFlushCount() == 0) return null;
+    List<byte[]> regions = new ArrayList<byte[]>();
+    for (ByteString region: proto.getRegionToFlushList()) {
+      regions.add(region.toByteArray());
+    }
+    return (byte[][])regions.toArray();
+  }
+
+  /**
+   * Get the list of region info from a GetOnlineRegionResponse
+   *
+   * @param proto the GetOnlineRegionResponse
+   * @return the list of region info
+   */
+  public static List<HRegionInfo> getRegionInfos(final GetOnlineRegionResponse proto) {
+    if (proto == null || proto.getRegionInfoCount() == 0) return null;
+    return ProtobufUtil.getRegionInfos(proto);
+  }
+
+  /**
+   * Get the region opening state from a OpenRegionResponse
+   *
+   * @param proto the OpenRegionResponse
+   * @return the region opening state
+   */
+  public static RegionOpeningState getRegionOpeningState
+      (final OpenRegionResponse proto) {
+    if (proto == null || proto.getOpeningStateCount() != 1) return null;
+    return RegionOpeningState.valueOf(
+      proto.getOpeningState(0).name());
+  }
+
+  /**
+   * Get a list of region opening state from a OpenRegionResponse
+   * 
+   * @param proto the OpenRegionResponse
+   * @return the list of region opening state
+   */
+  public static List<RegionOpeningState> getRegionOpeningStateList(
+      final OpenRegionResponse proto) {
+    if (proto == null) return null;
+    List<RegionOpeningState> regionOpeningStates = new ArrayList<RegionOpeningState>();
+    for (int i = 0; i < proto.getOpeningStateCount(); i++) {
+      regionOpeningStates.add(RegionOpeningState.valueOf(
+          proto.getOpeningState(i).name()));
+    }
+    return regionOpeningStates;
+  }
+
+  /**
+   * Check if the region is closed from a CloseRegionResponse
+   *
+   * @param proto the CloseRegionResponse
+   * @return the region close state
+   */
+  public static boolean isClosed
+      (final CloseRegionResponse proto) {
+    if (proto == null || !proto.hasClosed()) return false;
+    return proto.getClosed();
+  }
+
+  /**
+   * A utility to build a GetServerInfoResponse.
+   *
+   * @param serverName
+   * @param webuiPort
+   * @return the response
+   */
+  public static GetServerInfoResponse buildGetServerInfoResponse(
+      final ServerName serverName, final int webuiPort) {
+    GetServerInfoResponse.Builder builder = GetServerInfoResponse.newBuilder();
+    ServerInfo.Builder serverInfoBuilder = ServerInfo.newBuilder();
+    serverInfoBuilder.setServerName(ProtobufUtil.toServerName(serverName));
+    if (webuiPort >= 0) {
+      serverInfoBuilder.setWebuiPort(webuiPort);
+    }
+    builder.setServerInfo(serverInfoBuilder.build());
+    return builder.build();
+  }
+
+  /**
+   * A utility to build a GetOnlineRegionResponse.
+   *
+   * @param regions
+   * @return the response
+   */
+  public static GetOnlineRegionResponse buildGetOnlineRegionResponse(
+      final List<HRegionInfo> regions) {
+    GetOnlineRegionResponse.Builder builder = GetOnlineRegionResponse.newBuilder();
+    for (HRegionInfo region: regions) {
+      builder.addRegionInfo(HRegionInfo.convert(region));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Creates a response for the catalog scan request
+   * @return A CatalogScanResponse
+   */
+  public static CatalogScanResponse buildCatalogScanResponse(int numCleaned) {
+    return CatalogScanResponse.newBuilder().setScanResult(numCleaned).build();
+  }
+
+  /**
+   * Creates a response for the catalog scan request
+   * @return A EnableCatalogJanitorResponse
+   */
+  public static EnableCatalogJanitorResponse buildEnableCatalogJanitorResponse(boolean prevValue) {
+    return EnableCatalogJanitorResponse.newBuilder().setPrevValue(prevValue).build();
+  }
+
+// End utilities for Admin
+
+  /**
+   * Creates a response for the last flushed sequence Id request
+   * @return A GetLastFlushedSequenceIdResponse
+   */
+  public static GetLastFlushedSequenceIdResponse buildGetLastFlushedSequenceIdResponse(
+      long seqId) {
+    return GetLastFlushedSequenceIdResponse.newBuilder().setLastFlushedSequenceId(seqId).build();
+  }
+
+  /**
+   * Stores an exception encountered during RPC invocation so it can be passed back
+   * through to the client.
+   * @param controller the controller instance provided by the client when calling the service
+   * @param ioe the exception encountered
+   */
+  public static void setControllerException(RpcController controller, IOException ioe) {
+    if (controller != null) {
+      if (controller instanceof ServerRpcController) {
+        ((ServerRpcController)controller).setFailedOn(ioe);
+      } else {
+        controller.setFailed(StringUtils.stringifyException(ioe));
+      }
+    }
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/package.html
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/package.html?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/package.html (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/package.html Mon Feb 25 22:50:17 2013
@@ -0,0 +1,30 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<head />
+<body bgcolor="white">
+Holds classes generated from <a href="http://code.google.com/apis/protocolbuffers/">protobuf</a>
+<code>src/main/protobuf</code> definition files.
+
+<p>See under <code>src/main/protobuf</code> for instruction on how to generate the content under
+the <code>generated</code> subpackage.
+</p>
+</body>
+</html>

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+public enum BloomType {
+  /**
+   * Bloomfilters disabled
+   */
+  NONE,
+  /**
+   * Bloom enabled with Table row as Key
+   */
+  ROW,
+  /**
+   * Bloom enabled with Table row & column (family+qualifier) as Key
+   */
+  ROWCOL
+}
\ No newline at end of file

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public enum RegionOpeningState {
+
+  OPENED,
+
+  ALREADY_OPENED,
+
+  FAILED_OPENING;
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,206 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class acts as a wrapper for all the objects used to identify and
+ * communicate with remote peers and is responsible for answering to expired
+ * sessions and re-establishing the ZK connections.
+ */
+@InterfaceAudience.Private
+public class ReplicationPeer implements Abortable, Closeable {
+  private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
+
+  private final String clusterKey;
+  private final String id;
+  private List<ServerName> regionServers = new ArrayList<ServerName>(0);
+  private final AtomicBoolean peerEnabled = new AtomicBoolean();
+  // Cannot be final since a new object needs to be recreated when session fails
+  private ZooKeeperWatcher zkw;
+  private final Configuration conf;
+
+  private PeerStateTracker peerStateTracker;
+
+  /**
+   * Constructor that takes all the objects required to communicate with the
+   * specified peer, except for the region server addresses.
+   * @param conf configuration object to this peer
+   * @param key cluster key used to locate the peer
+   * @param id string representation of this peer's identifier
+   */
+  public ReplicationPeer(Configuration conf, String key,
+      String id) throws IOException {
+    this.conf = conf;
+    this.clusterKey = key;
+    this.id = id;
+    this.reloadZkWatcher();
+  }
+
+  /**
+   * start a state tracker to check whether this peer is enabled or not
+   *
+   * @param zookeeper zk watcher for the local cluster
+   * @param peerStateNode path to zk node which stores peer state
+   * @throws KeeperException
+   */
+  public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
+      throws KeeperException {
+    ReplicationZookeeper.ensurePeerEnabled(zookeeper, peerStateNode);
+    this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
+    this.peerStateTracker.start();
+    try {
+      this.readPeerStateZnode();
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    }
+  }
+
+  private void readPeerStateZnode() throws DeserializationException {
+    this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false)));
+  }
+
+  /**
+   * Get the cluster key of that peer
+   * @return string consisting of zk ensemble addresses, client port
+   * and root znode
+   */
+  public String getClusterKey() {
+    return clusterKey;
+  }
+
+  /**
+   * Get the state of this peer
+   * @return atomic boolean that holds the status
+   */
+  public AtomicBoolean getPeerEnabled() {
+    return peerEnabled;
+  }
+
+  /**
+   * Get a list of all the addresses of all the region servers
+   * for this peer cluster
+   * @return list of addresses
+   */
+  public List<ServerName> getRegionServers() {
+    return regionServers;
+  }
+
+  /**
+   * Set the list of region servers for that peer
+   * @param regionServers list of addresses for the region servers
+   */
+  public void setRegionServers(List<ServerName> regionServers) {
+    this.regionServers = regionServers;
+  }
+
+  /**
+   * Get the ZK connection to this peer
+   * @return zk connection
+   */
+  public ZooKeeperWatcher getZkw() {
+    return zkw;
+  }
+
+  /**
+   * Get the identifier of this peer
+   * @return string representation of the id (short)
+   */
+  public String getId() {
+    return id;
+  }
+
+  /**
+   * Get the configuration object required to communicate with this peer
+   * @return configuration object
+   */
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    LOG.fatal("The ReplicationPeer coresponding to peer " + clusterKey
+        + " was aborted for the following reason(s):" + why, e);
+  }
+
+  /**
+   * Closes the current ZKW (if not null) and creates a new one
+   * @throws IOException If anything goes wrong connecting
+   */
+  public void reloadZkWatcher() throws IOException {
+    if (zkw != null) zkw.close();
+    zkw = new ZooKeeperWatcher(conf,
+        "connection to cluster: " + id, this);
+  }
+
+  @Override
+  public boolean isAborted() {
+    // Currently the replication peer is never "Aborted", we just log when the
+    // abort method is called.
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (zkw != null){
+      zkw.close();
+    }
+  }
+
+  /**
+   * Tracker for state of this peer
+   */
+  public class PeerStateTracker extends ZooKeeperNodeTracker {
+
+    public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
+        Abortable abortable) {
+      super(watcher, peerStateZNode, abortable);
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+        try {
+          readPeerStateZnode();
+        } catch (DeserializationException e) {
+          LOG.warn("Failed deserializing the content of " + path, e);
+        }
+      }
+    }
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * ReplicationStateImpl is responsible for maintaining the replication state
+ * znode.
+ */
+public class ReplicationStateImpl implements ReplicationStateInterface {
+
+  private ReplicationStateTracker stateTracker;
+  private final String stateZnode;
+  private final ZooKeeperWatcher zookeeper;
+  private final Abortable abortable;
+  private final AtomicBoolean replicating;
+
+  private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
+
+  public ReplicationStateImpl(final ZooKeeperWatcher zk, final String stateZnode,
+      final Abortable abortable, final AtomicBoolean replicating) {
+    this.zookeeper = zk;
+    this.stateZnode = stateZnode;
+    this.abortable = abortable;
+    this.replicating = replicating;
+
+    // Set a tracker on replicationStateNode
+    this.stateTracker = new ReplicationStateTracker(this.zookeeper, this.stateZnode,
+        this.abortable);
+    stateTracker.start();
+    readReplicationStateZnode();
+  }
+
+  public boolean getState() throws KeeperException {
+    return getReplication();
+  }
+
+  public void setState(boolean newState) throws KeeperException {
+    setReplicating(newState);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (stateTracker != null) stateTracker.stop();
+  }
+
+  /**
+   * @param bytes
+   * @return True if the passed in <code>bytes</code> are those of a pb
+   *         serialized ENABLED state.
+   * @throws DeserializationException
+   */
+  private boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+    ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+    return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+  }
+
+  /**
+   * @param bytes Content of a state znode.
+   * @return State parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+      throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(bytes);
+    int pblen = ProtobufUtil.lengthOfPBMagic();
+    ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
+        .newBuilder();
+    ZooKeeperProtos.ReplicationState state;
+    try {
+      state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      return state.getState();
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+  }
+
+  /**
+   * Set the new replication state for this cluster
+   * @param newState
+   */
+  private void setReplicating(boolean newState) throws KeeperException {
+    ZKUtil.createWithParents(this.zookeeper, this.stateZnode);
+    byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
+        : ReplicationZookeeper.DISABLED_ZNODE_BYTES;
+    ZKUtil.setData(this.zookeeper, this.stateZnode, stateBytes);
+  }
+
+  /**
+   * Get the replication status of this cluster. If the state znode doesn't
+   * exist it will also create it and set it true.
+   * @return returns true when it's enabled, else false
+   * @throws KeeperException
+   */
+  private boolean getReplication() throws KeeperException {
+    byte[] data = this.stateTracker.getData(false);
+    if (data == null || data.length == 0) {
+      setReplicating(true);
+      return true;
+    }
+    try {
+      return isStateEnabled(data);
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    }
+  }
+
+  /**
+   * This reads the state znode for replication and sets the atomic boolean
+   */
+  private void readReplicationStateZnode() {
+    try {
+      this.replicating.set(getReplication());
+      LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed getting data on from " + this.stateZnode, e);
+    }
+  }
+
+  /**
+   * Tracker for status of the replication
+   */
+  private class ReplicationStateTracker extends ZooKeeperNodeTracker {
+    public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) {
+      super(watcher, stateZnode, abortable);
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+        readReplicationStateZnode();
+      }
+    }
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.zookeeper.KeeperException;
+
+import java.io.Closeable;
+
+/**
+ * This provides an interface for getting and setting the replication state of a
+ * cluster. This state is used to indicate whether replication is enabled or
+ * disabled on a cluster.
+ */
+public interface ReplicationStateInterface extends Closeable {
+	
+  /**
+   * Get the current state of replication (i.e. ENABLED or DISABLED).
+   * 
+   * @return true if replication is enabled, false otherwise
+   * @throws KeeperException
+   */
+  public boolean getState() throws KeeperException;
+	
+  /**
+   * Set the state of replication.
+   * 
+   * @param newState
+   * @throws KeeperException
+   */
+  public void setState(boolean newState) throws KeeperException;
+}
\ No newline at end of file