You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/02/25 23:50:29 UTC
svn commit: r1449950 [22/35] - in /hbase/trunk: ./ hbase-client/
hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/
hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/
hbase-client/src/main/java/org/apache/hadoop/ h...
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,1141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Helper utility to build protocol buffer requests,
+ * or build components for protocol buffer requests.
+ */
+@InterfaceAudience.Private
+public final class RequestConverter {
+
+ private RequestConverter() {
+ }
+
+// Start utilities for Client
+
+/**
+ * Create a new protocol buffer GetRequest to get a row, all columns in a family.
+ * If there is no such row, return the closest row before it.
+ *
+ * @param regionName the name of the region to get
+ * @param row the row to get
+ * @param family the column family to get
+ * should return the immediate row before
+ * @return a protocol buffer GetReuqest
+ */
+ public static GetRequest buildGetRowOrBeforeRequest(
+ final byte[] regionName, final byte[] row, final byte[] family) {
+ GetRequest.Builder builder = GetRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setClosestRowBefore(true);
+ builder.setRegion(region);
+
+ Column.Builder columnBuilder = Column.newBuilder();
+ columnBuilder.setFamily(ByteString.copyFrom(family));
+ ClientProtos.Get.Builder getBuilder =
+ ClientProtos.Get.newBuilder();
+ getBuilder.setRow(ByteString.copyFrom(row));
+ getBuilder.addColumn(columnBuilder.build());
+ builder.setGet(getBuilder.build());
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer GetRequest for a client Get
+ *
+ * @param regionName the name of the region to get
+ * @param get the client Get
+ * @return a protocol buffer GetReuqest
+ */
+ public static GetRequest buildGetRequest(final byte[] regionName,
+ final Get get) throws IOException {
+ return buildGetRequest(regionName, get, false);
+ }
+
+ /**
+ * Create a protocol buffer GetRequest for a client Get
+ *
+ * @param regionName the name of the region to get
+ * @param get the client Get
+ * @param existenceOnly indicate if check row existence only
+ * @return a protocol buffer GetRequest
+ */
+ public static GetRequest buildGetRequest(final byte[] regionName,
+ final Get get, final boolean existenceOnly) throws IOException {
+ GetRequest.Builder builder = GetRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setExistenceOnly(existenceOnly);
+ builder.setRegion(region);
+ builder.setGet(ProtobufUtil.toGet(get));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MultiGetRequest for client Gets All gets are going to be run against
+ * the same region.
+ * @param regionName the name of the region to get from
+ * @param gets the client Gets
+ * @param existenceOnly indicate if check rows existence only
+ * @return a protocol buffer MultiGetRequest
+ */
+ public static MultiGetRequest buildMultiGetRequest(final byte[] regionName, final List<Get> gets,
+ final boolean existenceOnly, final boolean closestRowBefore) throws IOException {
+ MultiGetRequest.Builder builder = MultiGetRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+ builder.setExistenceOnly(existenceOnly);
+ builder.setClosestRowBefore(closestRowBefore);
+ builder.setRegion(region);
+ for (Get get : gets) {
+ builder.addGet(ProtobufUtil.toGet(get));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a client increment
+ *
+ * @param regionName
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param amount
+ * @param writeToWAL
+ * @return a mutate request
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final byte[] row, final byte[] family,
+ final byte [] qualifier, final long amount, final boolean writeToWAL) {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+
+ Mutate.Builder mutateBuilder = Mutate.newBuilder();
+ mutateBuilder.setRow(ByteString.copyFrom(row));
+ mutateBuilder.setMutateType(MutateType.INCREMENT);
+ mutateBuilder.setWriteToWAL(writeToWAL);
+ ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
+ columnBuilder.setFamily(ByteString.copyFrom(family));
+ QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
+ valueBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(amount)));
+ valueBuilder.setQualifier(ByteString.copyFrom(qualifier));
+ columnBuilder.addQualifierValue(valueBuilder.build());
+ mutateBuilder.addColumnValue(columnBuilder.build());
+
+ builder.setMutate(mutateBuilder.build());
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a conditioned put
+ *
+ * @param regionName
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param comparator
+ * @param compareType
+ * @param put
+ * @return a mutate request
+ * @throws IOException
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final byte[] row, final byte[] family,
+ final byte [] qualifier, final ByteArrayComparable comparator,
+ final CompareType compareType, final Put put) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ Condition condition = buildCondition(
+ row, family, qualifier, comparator, compareType);
+ builder.setMutate(ProtobufUtil.toMutate(MutateType.PUT, put));
+ builder.setCondition(condition);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a conditioned delete
+ *
+ * @param regionName
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param comparator
+ * @param compareType
+ * @param delete
+ * @return a mutate request
+ * @throws IOException
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final byte[] row, final byte[] family,
+ final byte [] qualifier, final ByteArrayComparable comparator,
+ final CompareType compareType, final Delete delete) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ Condition condition = buildCondition(
+ row, family, qualifier, comparator, compareType);
+ builder.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, delete));
+ builder.setCondition(condition);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a put
+ *
+ * @param regionName
+ * @param put
+ * @return a mutate request
+ * @throws IOException
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final Put put) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setMutate(ProtobufUtil.toMutate(MutateType.PUT, put));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for an append
+ *
+ * @param regionName
+ * @param append
+ * @return a mutate request
+ * @throws IOException
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final Append append) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, append));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a client increment
+ *
+ * @param regionName
+ * @param increment
+ * @return a mutate request
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final Increment increment) {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setMutate(ProtobufUtil.toMutate(increment));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a delete
+ *
+ * @param regionName
+ * @param delete
+ * @return a mutate request
+ * @throws IOException
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final Delete delete) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, delete));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MultiRequest for a row mutations
+ *
+ * @param regionName
+ * @param rowMutations
+ * @return a multi request
+ * @throws IOException
+ */
+ public static MultiRequest buildMultiRequest(final byte[] regionName,
+ final RowMutations rowMutations) throws IOException {
+ MultiRequest.Builder builder = MultiRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setAtomic(true);
+ for (Mutation mutation: rowMutations.getMutations()) {
+ MutateType mutateType = null;
+ if (mutation instanceof Put) {
+ mutateType = MutateType.PUT;
+ } else if (mutation instanceof Delete) {
+ mutateType = MutateType.DELETE;
+ } else {
+ throw new DoNotRetryIOException(
+ "RowMutations supports only put and delete, not "
+ + mutation.getClass().getName());
+ }
+ Mutate mutate = ProtobufUtil.toMutate(mutateType, mutation);
+ builder.addAction(MultiAction.newBuilder().setMutate(mutate).build());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer ScanRequest for a client Scan
+ *
+ * @param regionName
+ * @param scan
+ * @param numberOfRows
+ * @param closeScanner
+ * @return a scan request
+ * @throws IOException
+ */
+ public static ScanRequest buildScanRequest(final byte[] regionName,
+ final Scan scan, final int numberOfRows,
+ final boolean closeScanner) throws IOException {
+ ScanRequest.Builder builder = ScanRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setNumberOfRows(numberOfRows);
+ builder.setCloseScanner(closeScanner);
+ builder.setRegion(region);
+ builder.setScan(ProtobufUtil.toScan(scan));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer ScanRequest for a scanner id
+ *
+ * @param scannerId
+ * @param numberOfRows
+ * @param closeScanner
+ * @return a scan request
+ */
+ public static ScanRequest buildScanRequest(final long scannerId,
+ final int numberOfRows, final boolean closeScanner) {
+ ScanRequest.Builder builder = ScanRequest.newBuilder();
+ builder.setNumberOfRows(numberOfRows);
+ builder.setCloseScanner(closeScanner);
+ builder.setScannerId(scannerId);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer ScanRequest for a scanner id
+ *
+ * @param scannerId
+ * @param numberOfRows
+ * @param closeScanner
+ * @param nextCallSeq
+ * @return a scan request
+ */
+ public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
+ final boolean closeScanner, final long nextCallSeq) {
+ ScanRequest.Builder builder = ScanRequest.newBuilder();
+ builder.setNumberOfRows(numberOfRows);
+ builder.setCloseScanner(closeScanner);
+ builder.setScannerId(scannerId);
+ builder.setNextCallSeq(nextCallSeq);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer bulk load request
+ *
+ * @param familyPaths
+ * @param regionName
+ * @param assignSeqNum
+ * @return a bulk load request
+ */
+ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
+ final List<Pair<byte[], String>> familyPaths,
+ final byte[] regionName, boolean assignSeqNum) {
+ BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ FamilyPath.Builder familyPathBuilder = FamilyPath.newBuilder();
+ for (Pair<byte[], String> familyPath: familyPaths) {
+ familyPathBuilder.setFamily(ByteString.copyFrom(familyPath.getFirst()));
+ familyPathBuilder.setPath(familyPath.getSecond());
+ builder.addFamilyPath(familyPathBuilder.build());
+ }
+ builder.setAssignSeqNum(assignSeqNum);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer multi request for a list of actions.
+ * RowMutations in the list (if any) will be ignored.
+ *
+ * @param regionName
+ * @param actions
+ * @return a multi request
+ * @throws IOException
+ */
+ public static <R> MultiRequest buildMultiRequest(final byte[] regionName,
+ final List<Action<R>> actions) throws IOException {
+ MultiRequest.Builder builder = MultiRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ for (Action<R> action: actions) {
+ MultiAction.Builder protoAction = MultiAction.newBuilder();
+
+ Row row = action.getAction();
+ if (row instanceof Get) {
+ protoAction.setGet(ProtobufUtil.toGet((Get)row));
+ } else if (row instanceof Put) {
+ protoAction.setMutate(ProtobufUtil.toMutate(MutateType.PUT, (Put)row));
+ } else if (row instanceof Delete) {
+ protoAction.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, (Delete)row));
+ } else if (row instanceof Append) {
+ protoAction.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, (Append)row));
+ } else if (row instanceof Increment) {
+ protoAction.setMutate(ProtobufUtil.toMutate((Increment)row));
+ } else if (row instanceof RowMutations) {
+ continue; // ignore RowMutations
+ } else {
+ throw new DoNotRetryIOException(
+ "multi doesn't support " + row.getClass().getName());
+ }
+ builder.addAction(protoAction.build());
+ }
+ return builder.build();
+ }
+
+// End utilities for Client
+//Start utilities for Admin
+
+ /**
+ * Create a protocol buffer GetRegionInfoRequest for a given region name
+ *
+ * @param regionName the name of the region to get info
+ * @return a protocol buffer GetRegionInfoRequest
+ */
+ public static GetRegionInfoRequest
+ buildGetRegionInfoRequest(final byte[] regionName) {
+ return buildGetRegionInfoRequest(regionName, false);
+ }
+
+ /**
+ * Create a protocol buffer GetRegionInfoRequest for a given region name
+ *
+ * @param regionName the name of the region to get info
+ * @param includeCompactionState indicate if the compaction state is requested
+ * @return a protocol buffer GetRegionInfoRequest
+ */
+ public static GetRegionInfoRequest
+ buildGetRegionInfoRequest(final byte[] regionName,
+ final boolean includeCompactionState) {
+ GetRegionInfoRequest.Builder builder = GetRegionInfoRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ if (includeCompactionState) {
+ builder.setCompactionState(includeCompactionState);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer GetStoreFileRequest for a given region name
+ *
+ * @param regionName the name of the region to get info
+ * @param family the family to get store file list
+ * @return a protocol buffer GetStoreFileRequest
+ */
+ public static GetStoreFileRequest
+ buildGetStoreFileRequest(final byte[] regionName, final byte[] family) {
+ GetStoreFileRequest.Builder builder = GetStoreFileRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.addFamily(ByteString.copyFrom(family));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer GetOnlineRegionRequest
+ *
+ * @return a protocol buffer GetOnlineRegionRequest
+ */
+ public static GetOnlineRegionRequest buildGetOnlineRegionRequest() {
+ return GetOnlineRegionRequest.newBuilder().build();
+ }
+
+ /**
+ * Create a protocol buffer FlushRegionRequest for a given region name
+ *
+ * @param regionName the name of the region to get info
+ * @return a protocol buffer FlushRegionRequest
+ */
+ public static FlushRegionRequest
+ buildFlushRegionRequest(final byte[] regionName) {
+ FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer OpenRegionRequest to open a list of regions
+ *
+ * @param regionOpenInfos info of a list of regions to open
+ * @return a protocol buffer OpenRegionRequest
+ */
+ public static OpenRegionRequest
+ buildOpenRegionRequest(final List<Pair<HRegionInfo, Integer>> regionOpenInfos) {
+ OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
+ for (Pair<HRegionInfo, Integer> regionOpenInfo: regionOpenInfos) {
+ Integer second = regionOpenInfo.getSecond();
+ int versionOfOfflineNode = second == null ? -1 : second.intValue();
+ builder.addOpenInfo(buildRegionOpenInfo(
+ regionOpenInfo.getFirst(), versionOfOfflineNode));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer OpenRegionRequest for a given region
+ *
+ * @param region the region to open
+ * @param versionOfOfflineNode that needs to be present in the offline node
+ * @return a protocol buffer OpenRegionRequest
+ */
+ public static OpenRegionRequest buildOpenRegionRequest(
+ final HRegionInfo region, final int versionOfOfflineNode) {
+ OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
+ builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode));
+ return builder.build();
+ }
+
+ /**
+ * Create a CloseRegionRequest for a given region name
+ *
+ * @param regionName the name of the region to close
+ * @param transitionInZK indicator if to transition in ZK
+ * @return a CloseRegionRequest
+ */
+ public static CloseRegionRequest buildCloseRegionRequest(
+ final byte[] regionName, final boolean transitionInZK) {
+ CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setTransitionInZK(transitionInZK);
+ return builder.build();
+ }
+
+ public static CloseRegionRequest buildCloseRegionRequest(
+ final byte[] regionName, final int versionOfClosingNode,
+ ServerName destinationServer, final boolean transitionInZK) {
+ CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setVersionOfClosingNode(versionOfClosingNode);
+ builder.setTransitionInZK(transitionInZK);
+ if (destinationServer != null){
+ builder.setDestinationServer(ProtobufUtil.toServerName( destinationServer) );
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a CloseRegionRequest for a given encoded region name
+ *
+ * @param encodedRegionName the name of the region to close
+ * @param transitionInZK indicator if to transition in ZK
+ * @return a CloseRegionRequest
+ */
+ public static CloseRegionRequest
+ buildCloseRegionRequest(final String encodedRegionName,
+ final boolean transitionInZK) {
+ CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.ENCODED_REGION_NAME,
+ Bytes.toBytes(encodedRegionName));
+ builder.setRegion(region);
+ builder.setTransitionInZK(transitionInZK);
+ return builder.build();
+ }
+
+ /**
+ * Create a SplitRegionRequest for a given region name
+ *
+ * @param regionName the name of the region to split
+ * @param splitPoint the split point
+ * @return a SplitRegionRequest
+ */
+ public static SplitRegionRequest buildSplitRegionRequest(
+ final byte[] regionName, final byte[] splitPoint) {
+ SplitRegionRequest.Builder builder = SplitRegionRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ if (splitPoint != null) {
+ builder.setSplitPoint(ByteString.copyFrom(splitPoint));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a CompactRegionRequest for a given region name
+ *
+ * @param regionName the name of the region to get info
+ * @param major indicator if it is a major compaction
+ * @return a CompactRegionRequest
+ */
+ public static CompactRegionRequest buildCompactRegionRequest(
+ final byte[] regionName, final boolean major, final byte [] family) {
+ CompactRegionRequest.Builder builder = CompactRegionRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setMajor(major);
+ if (family != null) {
+ builder.setFamily(ByteString.copyFrom(family));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a new RollWALWriterRequest
+ *
+ * @return a ReplicateWALEntryRequest
+ */
+ public static RollWALWriterRequest buildRollWALWriterRequest() {
+ RollWALWriterRequest.Builder builder = RollWALWriterRequest.newBuilder();
+ return builder.build();
+ }
+
+ /**
+ * Create a new GetServerInfoRequest
+ *
+ * @return a GetServerInfoRequest
+ */
+ public static GetServerInfoRequest buildGetServerInfoRequest() {
+ GetServerInfoRequest.Builder builder = GetServerInfoRequest.newBuilder();
+ return builder.build();
+ }
+
+ /**
+ * Create a new StopServerRequest
+ *
+ * @param reason the reason to stop the server
+ * @return a StopServerRequest
+ */
+ public static StopServerRequest buildStopServerRequest(final String reason) {
+ StopServerRequest.Builder builder = StopServerRequest.newBuilder();
+ builder.setReason(reason);
+ return builder.build();
+ }
+
+//End utilities for Admin
+
+ /**
+ * Convert a byte array to a protocol buffer RegionSpecifier
+ *
+ * @param type the region specifier type
+ * @param value the region specifier byte array value
+ * @return a protocol buffer RegionSpecifier
+ */
+ public static RegionSpecifier buildRegionSpecifier(
+ final RegionSpecifierType type, final byte[] value) {
+ RegionSpecifier.Builder regionBuilder = RegionSpecifier.newBuilder();
+ regionBuilder.setValue(ByteString.copyFrom(value));
+ regionBuilder.setType(type);
+ return regionBuilder.build();
+ }
+
+ /**
+ * Create a protocol buffer Condition
+ *
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param comparator
+ * @param compareType
+ * @return a Condition
+ * @throws IOException
+ */
+ private static Condition buildCondition(final byte[] row,
+ final byte[] family, final byte [] qualifier,
+ final ByteArrayComparable comparator,
+ final CompareType compareType) throws IOException {
+ Condition.Builder builder = Condition.newBuilder();
+ builder.setRow(ByteString.copyFrom(row));
+ builder.setFamily(ByteString.copyFrom(family));
+ builder.setQualifier(ByteString.copyFrom(qualifier));
+ builder.setComparator(ProtobufUtil.toComparator(comparator));
+ builder.setCompareType(compareType);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer AddColumnRequest
+ *
+ * @param tableName
+ * @param column
+ * @return an AddColumnRequest
+ */
+ public static AddColumnRequest buildAddColumnRequest(
+ final byte [] tableName, final HColumnDescriptor column) {
+ AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
+ builder.setTableName(ByteString.copyFrom(tableName));
+ builder.setColumnFamilies(column.convert());
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer DeleteColumnRequest
+ *
+ * @param tableName
+ * @param columnName
+ * @return a DeleteColumnRequest
+ */
+ public static DeleteColumnRequest buildDeleteColumnRequest(
+ final byte [] tableName, final byte [] columnName) {
+ DeleteColumnRequest.Builder builder = DeleteColumnRequest.newBuilder();
+ builder.setTableName(ByteString.copyFrom(tableName));
+ builder.setColumnName(ByteString.copyFrom(columnName));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer ModifyColumnRequest
+ *
+ * @param tableName
+ * @param column
+ * @return an ModifyColumnRequest
+ */
+ public static ModifyColumnRequest buildModifyColumnRequest(
+ final byte [] tableName, final HColumnDescriptor column) {
+ ModifyColumnRequest.Builder builder = ModifyColumnRequest.newBuilder();
+ builder.setTableName(ByteString.copyFrom(tableName));
+ builder.setColumnFamilies(column.convert());
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MoveRegionRequest
+ *
+ * @param encodedRegionName
+ * @param destServerName
+ * @return A MoveRegionRequest
+ * @throws DeserializationException
+ */
+ public static MoveRegionRequest buildMoveRegionRequest(
+ final byte [] encodedRegionName, final byte [] destServerName) throws
+ DeserializationException {
+ MoveRegionRequest.Builder builder = MoveRegionRequest.newBuilder();
+ builder.setRegion(
+ buildRegionSpecifier(RegionSpecifierType.ENCODED_REGION_NAME,encodedRegionName));
+ if (destServerName != null) {
+ builder.setDestServerName(
+ ProtobufUtil.toServerName(new ServerName(Bytes.toString(destServerName))));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer AssignRegionRequest
+ *
+ * @param regionName
+ * @return an AssignRegionRequest
+ */
+ public static AssignRegionRequest buildAssignRegionRequest(final byte [] regionName) {
+ AssignRegionRequest.Builder builder = AssignRegionRequest.newBuilder();
+ builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME,regionName));
+ return builder.build();
+ }
+
+ /**
+ * Creates a protocol buffer UnassignRegionRequest
+ *
+ * @param regionName
+ * @param force
+ * @return an UnassignRegionRequest
+ */
+ public static UnassignRegionRequest buildUnassignRegionRequest(
+ final byte [] regionName, final boolean force) {
+ UnassignRegionRequest.Builder builder = UnassignRegionRequest.newBuilder();
+ builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME,regionName));
+ builder.setForce(force);
+ return builder.build();
+ }
+
+ /**
+ * Creates a protocol buffer OfflineRegionRequest
+ *
+ * @param regionName
+ * @return an OfflineRegionRequest
+ */
+ public static OfflineRegionRequest buildOfflineRegionRequest(final byte [] regionName) {
+ OfflineRegionRequest.Builder builder = OfflineRegionRequest.newBuilder();
+ builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME,regionName));
+ return builder.build();
+ }
+
+ /**
+ * Creates a protocol buffer DeleteTableRequest
+ *
+ * @param tableName
+ * @return a DeleteTableRequest
+ */
+ public static DeleteTableRequest buildDeleteTableRequest(final byte [] tableName) {
+ DeleteTableRequest.Builder builder = DeleteTableRequest.newBuilder();
+ builder.setTableName(ByteString.copyFrom(tableName));
+ return builder.build();
+ }
+
+ /**
+ * Creates a protocol buffer EnableTableRequest
+ *
+ * @param tableName
+ * @return an EnableTableRequest
+ */
+ public static EnableTableRequest buildEnableTableRequest(final byte [] tableName) {
+ EnableTableRequest.Builder builder = EnableTableRequest.newBuilder();
+ builder.setTableName(ByteString.copyFrom(tableName));
+ return builder.build();
+ }
+
+ /**
+ * Creates a protocol buffer DisableTableRequest
+ *
+ * @param tableName
+ * @return a DisableTableRequest
+ */
+ public static DisableTableRequest buildDisableTableRequest(final byte [] tableName) {
+ DisableTableRequest.Builder builder = DisableTableRequest.newBuilder();
+ builder.setTableName(ByteString.copyFrom(tableName));
+ return builder.build();
+ }
+
+ /**
+ * Creates a protocol buffer CreateTableRequest
+ *
+ * @param hTableDesc
+ * @param splitKeys
+ * @return a CreateTableRequest
+ */
+ public static CreateTableRequest buildCreateTableRequest(
+ final HTableDescriptor hTableDesc, final byte [][] splitKeys) {
+ CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
+ builder.setTableSchema(hTableDesc.convert());
+ if (splitKeys != null) {
+ for (byte [] splitKey : splitKeys) {
+ builder.addSplitKeys(ByteString.copyFrom(splitKey));
+ }
+ }
+ return builder.build();
+ }
+
+
+ /**
+ * Creates a protocol buffer ModifyTableRequest
+ *
+ * @param table
+ * @param hTableDesc
+ * @return a ModifyTableRequest
+ */
+ public static ModifyTableRequest buildModifyTableRequest(
+ final byte [] table, final HTableDescriptor hTableDesc) {
+ ModifyTableRequest.Builder builder = ModifyTableRequest.newBuilder();
+ builder.setTableName(ByteString.copyFrom(table));
+ builder.setTableSchema(hTableDesc.convert());
+ return builder.build();
+ }
+
+ /**
+ * Creates a protocol buffer GetSchemaAlterStatusRequest
+ *
+ * @param tableName
+ * @return a GetSchemaAlterStatusRequest
+ */
+ public static GetSchemaAlterStatusRequest buildGetSchemaAlterStatusRequest(
+ final byte [] tableName) {
+ GetSchemaAlterStatusRequest.Builder builder = GetSchemaAlterStatusRequest.newBuilder();
+ builder.setTableName(ByteString.copyFrom(tableName));
+ return builder.build();
+ }
+
+ /**
+ * Creates a protocol buffer GetTableDescriptorsRequest
+ *
+ * @param tableNames
+ * @return a GetTableDescriptorsRequest
+ */
+ public static GetTableDescriptorsRequest buildGetTableDescriptorsRequest(
+ final List<String> tableNames) {
+ GetTableDescriptorsRequest.Builder builder = GetTableDescriptorsRequest.newBuilder();
+ if (tableNames != null) {
+ for (String str : tableNames) {
+ builder.addTableNames(str);
+ }
+ }
+ return builder.build();
+ }
+
+ /**
+ * Creates a protocol buffer IsMasterRunningRequest
+ *
+ * @return a IsMasterRunningRequest
+ */
+ public static IsMasterRunningRequest buildIsMasterRunningRequest() {
+ return IsMasterRunningRequest.newBuilder().build();
+ }
+
+ /**
+ * Creates a protocol buffer BalanceRequest
+ *
+ * @return a BalanceRequest
+ */
+ public static BalanceRequest buildBalanceRequest() {
+ return BalanceRequest.newBuilder().build();
+ }
+
+ /**
+ * Creates a protocol buffer SetBalancerRunningRequest
+ *
+ * @param on
+ * @param synchronous
+ * @return a SetBalancerRunningRequest
+ */
+ public static SetBalancerRunningRequest buildSetBalancerRunningRequest(boolean on, boolean synchronous) {
+ return SetBalancerRunningRequest.newBuilder().setOn(on).setSynchronous(synchronous).build();
+ }
+
+ /**
+ * Creates a protocol buffer GetClusterStatusRequest
+ *
+ * @return A GetClusterStatusRequest
+ */
+ public static GetClusterStatusRequest buildGetClusterStatusRequest() {
+ return GetClusterStatusRequest.newBuilder().build();
+ }
+
+ /**
+ * Creates a request for running a catalog scan
+ * @return A {@link CatalogScanRequest}
+ */
+ public static CatalogScanRequest buildCatalogScanRequest() {
+ return CatalogScanRequest.newBuilder().build();
+ }
+
+ /**
+ * Creates a request for enabling/disabling the catalog janitor
+ * @return A {@link EnableCatalogJanitorRequest}
+ */
+ public static EnableCatalogJanitorRequest buildEnableCatalogJanitorRequest(boolean enable) {
+ return EnableCatalogJanitorRequest.newBuilder().setEnable(enable).build();
+ }
+
+ /**
+ * Creates a request for querying the master whether the catalog janitor is enabled
+ * @return A {@link IsCatalogJanitorEnabledRequest}
+ */
+ public static IsCatalogJanitorEnabledRequest buildIsCatalogJanitorEnabledRequest() {
+ return IsCatalogJanitorEnabledRequest.newBuilder().build();
+ }
+
+ /**
+ * Creates a request for querying the master the last flushed sequence Id for a region
+ * @param regionName
+ * @return A {@link GetLastFlushedSequenceIdRequest}
+ */
+ public static GetLastFlushedSequenceIdRequest buildGetLastFlushedSequenceIdRequest(
+ byte[] regionName) {
+ return GetLastFlushedSequenceIdRequest.newBuilder().setRegionName(
+ ByteString.copyFrom(regionName)).build();
+ }
+
+ /**
+ * Create a request to grant user permissions.
+ *
+ * @param username the short user name who to grant permissions
+ * @param table optional table name the permissions apply
+ * @param family optional column family
+ * @param qualifier optional qualifier
+ * @param actions the permissions to be granted
+ * @return A {@link AccessControlProtos} GrantRequest
+ */
+ public static AccessControlProtos.GrantRequest buildGrantRequest(
+ String username, byte[] table, byte[] family, byte[] qualifier,
+ AccessControlProtos.Permission.Action... actions) {
+ AccessControlProtos.Permission.Builder permissionBuilder =
+ AccessControlProtos.Permission.newBuilder();
+ for (AccessControlProtos.Permission.Action a : actions) {
+ permissionBuilder.addAction(a);
+ }
+ if (table != null) {
+ permissionBuilder.setTable(ByteString.copyFrom(table));
+ }
+ if (family != null) {
+ permissionBuilder.setFamily(ByteString.copyFrom(family));
+ }
+ if (qualifier != null) {
+ permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
+ }
+
+ return AccessControlProtos.GrantRequest.newBuilder()
+ .setPermission(
+ AccessControlProtos.UserPermission.newBuilder()
+ .setUser(ByteString.copyFromUtf8(username))
+ .setPermission(permissionBuilder.build())
+ ).build();
+ }
+
+ /**
+ * Create a request to revoke user permissions.
+ *
+ * @param username the short user name whose permissions to be revoked
+ * @param table optional table name the permissions apply
+ * @param family optional column family
+ * @param qualifier optional qualifier
+ * @param actions the permissions to be revoked
+ * @return A {@link AccessControlProtos} RevokeRequest
+ */
+ public static AccessControlProtos.RevokeRequest buildRevokeRequest(
+ String username, byte[] table, byte[] family, byte[] qualifier,
+ AccessControlProtos.Permission.Action... actions) {
+ AccessControlProtos.Permission.Builder permissionBuilder =
+ AccessControlProtos.Permission.newBuilder();
+ for (AccessControlProtos.Permission.Action a : actions) {
+ permissionBuilder.addAction(a);
+ }
+ if (table != null) {
+ permissionBuilder.setTable(ByteString.copyFrom(table));
+ }
+ if (family != null) {
+ permissionBuilder.setFamily(ByteString.copyFrom(family));
+ }
+ if (qualifier != null) {
+ permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
+ }
+
+ return AccessControlProtos.RevokeRequest.newBuilder()
+ .setPermission(
+ AccessControlProtos.UserPermission.newBuilder()
+ .setUser(ByteString.copyFromUtf8(username))
+ .setPermission(permissionBuilder.build())
+ ).build();
+ }
+
+ /**
+ * Create a RegionOpenInfo based on given region info and version of offline node
+ */
+ private static RegionOpenInfo buildRegionOpenInfo(
+ final HRegionInfo region, final int versionOfOfflineNode) {
+ RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();
+ builder.setRegion(HRegionInfo.convert(region));
+ if (versionOfOfflineNode >= 0) {
+ builder.setVersionOfOfflineNode(versionOfOfflineNode);
+ }
+ return builder.build();
+ }
+}
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.UserPermissionsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility to build protocol buffer responses,
+ * or retrieve data from protocol buffer responses.
+ */
+@InterfaceAudience.Private
+public final class ResponseConverter {
+
+ private ResponseConverter() {
+ }
+
+// Start utilities for Client
+
+ /**
+ * Get the client Results from a protocol buffer ScanResponse
+ *
+ * @param response the protocol buffer ScanResponse
+ * @return the client Results in the response
+ */
+ public static Result[] getResults(final ScanResponse response) {
+ if (response == null) return null;
+ int count = response.getResultCount();
+ Result[] results = new Result[count];
+ for (int i = 0; i < count; i++) {
+ results[i] = ProtobufUtil.toResult(response.getResult(i));
+ }
+ return results;
+ }
+
+ /**
+ * Get the results from a protocol buffer MultiResponse
+ *
+ * @param proto the protocol buffer MultiResponse to convert
+ * @return the results in the MultiResponse
+ * @throws IOException
+ */
+ public static List<Object> getResults(
+ final ClientProtos.MultiResponse proto) throws IOException {
+ List<Object> results = new ArrayList<Object>();
+ List<ActionResult> resultList = proto.getResultList();
+ for (int i = 0, n = resultList.size(); i < n; i++) {
+ ActionResult result = resultList.get(i);
+ if (result.hasException()) {
+ results.add(ProtobufUtil.toException(result.getException()));
+ } else if (result.hasValue()) {
+ ClientProtos.Result r = result.getValue();
+ Object value = ProtobufUtil.toResult(r);
+ if (value instanceof ClientProtos.Result) {
+ results.add(ProtobufUtil.toResult((ClientProtos.Result)value));
+ } else {
+ results.add(value);
+ }
+ } else {
+ results.add(new Result());
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Wrap a throwable to an action result.
+ *
+ * @param t
+ * @return an action result
+ */
+ public static ActionResult buildActionResult(final Throwable t) {
+ ActionResult.Builder builder = ActionResult.newBuilder();
+ NameBytesPair.Builder parameterBuilder = NameBytesPair.newBuilder();
+ parameterBuilder.setName(t.getClass().getName());
+ parameterBuilder.setValue(
+ ByteString.copyFromUtf8(StringUtils.stringifyException(t)));
+ builder.setException(parameterBuilder.build());
+ return builder.build();
+ }
+
+ /**
+ * Converts the permissions list into a protocol buffer UserPermissionsResponse
+ */
+ public static UserPermissionsResponse buildUserPermissionsResponse(
+ final List<UserPermission> permissions) {
+ UserPermissionsResponse.Builder builder = UserPermissionsResponse.newBuilder();
+ for (UserPermission perm : permissions) {
+ builder.addPermission(ProtobufUtil.toUserPermission(perm));
+ }
+ return builder.build();
+ }
+
+// End utilities for Client
+// Start utilities for Admin
+
+ /**
+ * Get the list of regions to flush from a RollLogWriterResponse
+ *
+ * @param proto the RollLogWriterResponse
+ * @return the the list of regions to flush
+ */
+ public static byte[][] getRegions(final RollWALWriterResponse proto) {
+ if (proto == null || proto.getRegionToFlushCount() == 0) return null;
+ List<byte[]> regions = new ArrayList<byte[]>();
+ for (ByteString region: proto.getRegionToFlushList()) {
+ regions.add(region.toByteArray());
+ }
+ return (byte[][])regions.toArray();
+ }
+
+ /**
+ * Get the list of region info from a GetOnlineRegionResponse
+ *
+ * @param proto the GetOnlineRegionResponse
+ * @return the list of region info
+ */
+ public static List<HRegionInfo> getRegionInfos(final GetOnlineRegionResponse proto) {
+ if (proto == null || proto.getRegionInfoCount() == 0) return null;
+ return ProtobufUtil.getRegionInfos(proto);
+ }
+
+ /**
+ * Get the region opening state from a OpenRegionResponse
+ *
+ * @param proto the OpenRegionResponse
+ * @return the region opening state
+ */
+ public static RegionOpeningState getRegionOpeningState
+ (final OpenRegionResponse proto) {
+ if (proto == null || proto.getOpeningStateCount() != 1) return null;
+ return RegionOpeningState.valueOf(
+ proto.getOpeningState(0).name());
+ }
+
+ /**
+ * Get a list of region opening state from a OpenRegionResponse
+ *
+ * @param proto the OpenRegionResponse
+ * @return the list of region opening state
+ */
+ public static List<RegionOpeningState> getRegionOpeningStateList(
+ final OpenRegionResponse proto) {
+ if (proto == null) return null;
+ List<RegionOpeningState> regionOpeningStates = new ArrayList<RegionOpeningState>();
+ for (int i = 0; i < proto.getOpeningStateCount(); i++) {
+ regionOpeningStates.add(RegionOpeningState.valueOf(
+ proto.getOpeningState(i).name()));
+ }
+ return regionOpeningStates;
+ }
+
+ /**
+ * Check if the region is closed from a CloseRegionResponse
+ *
+ * @param proto the CloseRegionResponse
+ * @return the region close state
+ */
+ public static boolean isClosed
+ (final CloseRegionResponse proto) {
+ if (proto == null || !proto.hasClosed()) return false;
+ return proto.getClosed();
+ }
+
+ /**
+ * A utility to build a GetServerInfoResponse.
+ *
+ * @param serverName
+ * @param webuiPort
+ * @return the response
+ */
+ public static GetServerInfoResponse buildGetServerInfoResponse(
+ final ServerName serverName, final int webuiPort) {
+ GetServerInfoResponse.Builder builder = GetServerInfoResponse.newBuilder();
+ ServerInfo.Builder serverInfoBuilder = ServerInfo.newBuilder();
+ serverInfoBuilder.setServerName(ProtobufUtil.toServerName(serverName));
+ if (webuiPort >= 0) {
+ serverInfoBuilder.setWebuiPort(webuiPort);
+ }
+ builder.setServerInfo(serverInfoBuilder.build());
+ return builder.build();
+ }
+
+ /**
+ * A utility to build a GetOnlineRegionResponse.
+ *
+ * @param regions
+ * @return the response
+ */
+ public static GetOnlineRegionResponse buildGetOnlineRegionResponse(
+ final List<HRegionInfo> regions) {
+ GetOnlineRegionResponse.Builder builder = GetOnlineRegionResponse.newBuilder();
+ for (HRegionInfo region: regions) {
+ builder.addRegionInfo(HRegionInfo.convert(region));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Creates a response for the catalog scan request
+ * @return A CatalogScanResponse
+ */
+ public static CatalogScanResponse buildCatalogScanResponse(int numCleaned) {
+ return CatalogScanResponse.newBuilder().setScanResult(numCleaned).build();
+ }
+
+ /**
+ * Creates a response for the catalog scan request
+ * @return A EnableCatalogJanitorResponse
+ */
+ public static EnableCatalogJanitorResponse buildEnableCatalogJanitorResponse(boolean prevValue) {
+ return EnableCatalogJanitorResponse.newBuilder().setPrevValue(prevValue).build();
+ }
+
+// End utilities for Admin
+
+ /**
+ * Creates a response for the last flushed sequence Id request
+ * @return A GetLastFlushedSequenceIdResponse
+ */
+ public static GetLastFlushedSequenceIdResponse buildGetLastFlushedSequenceIdResponse(
+ long seqId) {
+ return GetLastFlushedSequenceIdResponse.newBuilder().setLastFlushedSequenceId(seqId).build();
+ }
+
+ /**
+ * Stores an exception encountered during RPC invocation so it can be passed back
+ * through to the client.
+ * @param controller the controller instance provided by the client when calling the service
+ * @param ioe the exception encountered
+ */
+ public static void setControllerException(RpcController controller, IOException ioe) {
+ if (controller != null) {
+ if (controller instanceof ServerRpcController) {
+ ((ServerRpcController)controller).setFailedOn(ioe);
+ } else {
+ controller.setFailed(StringUtils.stringifyException(ioe));
+ }
+ }
+ }
+}
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/package.html
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/package.html?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/package.html (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/package.html Mon Feb 25 22:50:17 2013
@@ -0,0 +1,30 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<head />
+<body bgcolor="white">
+Holds classes generated from <a href="http://code.google.com/apis/protocolbuffers/">protobuf</a>
+<code>src/main/protobuf</code> definition files.
+
+<p>See under <code>src/main/protobuf</code> for instruction on how to generate the content under
+the <code>generated</code> subpackage.
+</p>
+</body>
+</html>
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+public enum BloomType {
+ /**
+ * Bloomfilters disabled
+ */
+ NONE,
+ /**
+ * Bloom enabled with Table row as Key
+ */
+ ROW,
+ /**
+ * Bloom enabled with Table row & column (family+qualifier) as Key
+ */
+ ROWCOL
+}
\ No newline at end of file
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public enum RegionOpeningState {
+
+ OPENED,
+
+ ALREADY_OPENED,
+
+ FAILED_OPENING;
+}
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,206 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class acts as a wrapper for all the objects used to identify and
+ * communicate with remote peers and is responsible for answering to expired
+ * sessions and re-establishing the ZK connections.
+ */
+@InterfaceAudience.Private
+public class ReplicationPeer implements Abortable, Closeable {
+ private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
+
+ private final String clusterKey;
+ private final String id;
+ private List<ServerName> regionServers = new ArrayList<ServerName>(0);
+ private final AtomicBoolean peerEnabled = new AtomicBoolean();
+ // Cannot be final since a new object needs to be recreated when session fails
+ private ZooKeeperWatcher zkw;
+ private final Configuration conf;
+
+ private PeerStateTracker peerStateTracker;
+
+ /**
+ * Constructor that takes all the objects required to communicate with the
+ * specified peer, except for the region server addresses.
+ * @param conf configuration object to this peer
+ * @param key cluster key used to locate the peer
+ * @param id string representation of this peer's identifier
+ */
+ public ReplicationPeer(Configuration conf, String key,
+ String id) throws IOException {
+ this.conf = conf;
+ this.clusterKey = key;
+ this.id = id;
+ this.reloadZkWatcher();
+ }
+
+ /**
+ * start a state tracker to check whether this peer is enabled or not
+ *
+ * @param zookeeper zk watcher for the local cluster
+ * @param peerStateNode path to zk node which stores peer state
+ * @throws KeeperException
+ */
+ public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
+ throws KeeperException {
+ ReplicationZookeeper.ensurePeerEnabled(zookeeper, peerStateNode);
+ this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
+ this.peerStateTracker.start();
+ try {
+ this.readPeerStateZnode();
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
+ }
+
+ private void readPeerStateZnode() throws DeserializationException {
+ this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false)));
+ }
+
+ /**
+ * Get the cluster key of that peer
+ * @return string consisting of zk ensemble addresses, client port
+ * and root znode
+ */
+ public String getClusterKey() {
+ return clusterKey;
+ }
+
+ /**
+ * Get the state of this peer
+ * @return atomic boolean that holds the status
+ */
+ public AtomicBoolean getPeerEnabled() {
+ return peerEnabled;
+ }
+
+ /**
+ * Get a list of all the addresses of all the region servers
+ * for this peer cluster
+ * @return list of addresses
+ */
+ public List<ServerName> getRegionServers() {
+ return regionServers;
+ }
+
+ /**
+ * Set the list of region servers for that peer
+ * @param regionServers list of addresses for the region servers
+ */
+ public void setRegionServers(List<ServerName> regionServers) {
+ this.regionServers = regionServers;
+ }
+
+ /**
+ * Get the ZK connection to this peer
+ * @return zk connection
+ */
+ public ZooKeeperWatcher getZkw() {
+ return zkw;
+ }
+
+ /**
+ * Get the identifier of this peer
+ * @return string representation of the id (short)
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Get the configuration object required to communicate with this peer
+ * @return configuration object
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.fatal("The ReplicationPeer coresponding to peer " + clusterKey
+ + " was aborted for the following reason(s):" + why, e);
+ }
+
+ /**
+ * Closes the current ZKW (if not null) and creates a new one
+ * @throws IOException If anything goes wrong connecting
+ */
+ public void reloadZkWatcher() throws IOException {
+ if (zkw != null) zkw.close();
+ zkw = new ZooKeeperWatcher(conf,
+ "connection to cluster: " + id, this);
+ }
+
+ @Override
+ public boolean isAborted() {
+ // Currently the replication peer is never "Aborted", we just log when the
+ // abort method is called.
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (zkw != null){
+ zkw.close();
+ }
+ }
+
+ /**
+ * Tracker for state of this peer
+ */
+ public class PeerStateTracker extends ZooKeeperNodeTracker {
+
+ public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
+ Abortable abortable) {
+ super(watcher, peerStateZNode, abortable);
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ if (path.equals(node)) {
+ super.nodeDataChanged(path);
+ try {
+ readPeerStateZnode();
+ } catch (DeserializationException e) {
+ LOG.warn("Failed deserializing the content of " + path, e);
+ }
+ }
+ }
+ }
+}
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * ReplicationStateImpl is responsible for maintaining the replication state
+ * znode.
+ */
+public class ReplicationStateImpl implements ReplicationStateInterface {
+
+ private ReplicationStateTracker stateTracker;
+ private final String stateZnode;
+ private final ZooKeeperWatcher zookeeper;
+ private final Abortable abortable;
+ private final AtomicBoolean replicating;
+
+ private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
+
+ public ReplicationStateImpl(final ZooKeeperWatcher zk, final String stateZnode,
+ final Abortable abortable, final AtomicBoolean replicating) {
+ this.zookeeper = zk;
+ this.stateZnode = stateZnode;
+ this.abortable = abortable;
+ this.replicating = replicating;
+
+ // Set a tracker on replicationStateNode
+ this.stateTracker = new ReplicationStateTracker(this.zookeeper, this.stateZnode,
+ this.abortable);
+ stateTracker.start();
+ readReplicationStateZnode();
+ }
+
+ public boolean getState() throws KeeperException {
+ return getReplication();
+ }
+
+ public void setState(boolean newState) throws KeeperException {
+ setReplicating(newState);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (stateTracker != null) stateTracker.stop();
+ }
+
+ /**
+ * @param bytes
+ * @return True if the passed in <code>bytes</code> are those of a pb
+ * serialized ENABLED state.
+ * @throws DeserializationException
+ */
+ private boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+ ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+ return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+ }
+
+ /**
+ * @param bytes Content of a state znode.
+ * @return State parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+ throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(bytes);
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
+ .newBuilder();
+ ZooKeeperProtos.ReplicationState state;
+ try {
+ state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ return state.getState();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ /**
+ * Set the new replication state for this cluster
+ * @param newState
+ */
+ private void setReplicating(boolean newState) throws KeeperException {
+ ZKUtil.createWithParents(this.zookeeper, this.stateZnode);
+ byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
+ : ReplicationZookeeper.DISABLED_ZNODE_BYTES;
+ ZKUtil.setData(this.zookeeper, this.stateZnode, stateBytes);
+ }
+
+ /**
+ * Get the replication status of this cluster. If the state znode doesn't
+ * exist it will also create it and set it true.
+ * @return returns true when it's enabled, else false
+ * @throws KeeperException
+ */
+ private boolean getReplication() throws KeeperException {
+ byte[] data = this.stateTracker.getData(false);
+ if (data == null || data.length == 0) {
+ setReplicating(true);
+ return true;
+ }
+ try {
+ return isStateEnabled(data);
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
+ }
+
+ /**
+ * This reads the state znode for replication and sets the atomic boolean
+ */
+ private void readReplicationStateZnode() {
+ try {
+ this.replicating.set(getReplication());
+ LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed getting data on from " + this.stateZnode, e);
+ }
+ }
+
+ /**
+ * Tracker for status of the replication
+ */
+ private class ReplicationStateTracker extends ZooKeeperNodeTracker {
+ public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) {
+ super(watcher, stateZnode, abortable);
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ if (path.equals(node)) {
+ super.nodeDataChanged(path);
+ readReplicationStateZnode();
+ }
+ }
+ }
+}
\ No newline at end of file
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.zookeeper.KeeperException;
+
+import java.io.Closeable;
+
+/**
+ * This provides an interface for getting and setting the replication state of a
+ * cluster. This state is used to indicate whether replication is enabled or
+ * disabled on a cluster.
+ */
+public interface ReplicationStateInterface extends Closeable {
+
+ /**
+ * Get the current state of replication (i.e. ENABLED or DISABLED).
+ *
+ * @return true if replication is enabled, false otherwise
+ * @throws KeeperException
+ */
+ public boolean getState() throws KeeperException;
+
+ /**
+ * Set the state of replication.
+ *
+ * @param newState
+ * @throws KeeperException
+ */
+ public void setState(boolean newState) throws KeeperException;
+}
\ No newline at end of file