You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/10/07 19:14:59 UTC

[31/77] [abbrv] [partial] hbase git commit: HBASE-15638 Shade protobuf Which includes

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
new file mode 100644
index 0000000..1849d90
--- /dev/null
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+/**
+ * Coprocessor service for bulk loads in secure mode.
+ * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
+ */
+@InterfaceAudience.Private
+@Deprecated
+public class SecureBulkLoadEndpoint extends SecureBulkLoadService
+    implements CoprocessorService, Coprocessor {
+
+  public static final long VERSION = 0L;
+
+  private static final Log LOG = LogFactory.getLog(SecureBulkLoadEndpoint.class);
+
+  private RegionCoprocessorEnvironment env;
+
+  @Override
+  public void start(CoprocessorEnvironment env) {
+    this.env = (RegionCoprocessorEnvironment)env;
+    LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
+    LOG.warn("Secure bulk load has been integrated into HBase core.");
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+  }
+
+  @Override
+  public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request,
+      RpcCallback<PrepareBulkLoadResponse> done) {
+    try {
+      SecureBulkLoadManager secureBulkLoadManager =
+          this.env.getRegionServerServices().getSecureBulkLoadManager();
+      String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(),
+          convert(request));
+      done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
+    } catch (IOException e) {
+      CoprocessorRpcUtils.setControllerException(controller, e);
+    }
+    done.run(null);
+  }
+
+  org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest
+    convert(PrepareBulkLoadRequest request)
+  throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException {
+    byte [] bytes = request.toByteArray();
+    org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.Builder
+          builder =
+        org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.
+        newBuilder();
+    builder.mergeFrom(bytes);
+    return builder.build();
+  }
+
+  @Override
+  public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request,
+      RpcCallback<CleanupBulkLoadResponse> done) {
+    try {
+      SecureBulkLoadManager secureBulkLoadManager =
+          this.env.getRegionServerServices().getSecureBulkLoadManager();
+      secureBulkLoadManager.cleanupBulkLoad(this.env.getRegion(), convert(request));
+      done.run(CleanupBulkLoadResponse.newBuilder().build());
+    } catch (IOException e) {
+      CoprocessorRpcUtils.setControllerException(controller, e);
+    }
+    done.run(null);
+  }
+
+  org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest
+  convert(CleanupBulkLoadRequest request)
+      throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException {
+    byte [] bytes = request.toByteArray();
+    org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.Builder
+        builder =
+      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.
+      newBuilder();
+    builder.mergeFrom(bytes);
+    return builder.build();
+  }
+
+  @Override
+  public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request,
+      RpcCallback<SecureBulkLoadHFilesResponse> done) {
+    boolean loaded = false;
+    try {
+      SecureBulkLoadManager secureBulkLoadManager =
+          this.env.getRegionServerServices().getSecureBulkLoadManager();
+      BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request);
+      loaded = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(),
+          convert(bulkLoadHFileRequest));
+    } catch (IOException e) {
+      CoprocessorRpcUtils.setControllerException(controller, e);
+    }
+    done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
+  }
+
+  org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest
+  convert(BulkLoadHFileRequest request)
+      throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException {
+    byte [] bytes = request.toByteArray();
+    org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder
+        builder =
+      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.
+        newBuilder();
+    builder.mergeFrom(bytes);
+    return builder.build();
+  }
+
+  private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest(
+      SecureBulkLoadHFilesRequest request) {
+    BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder();
+    RegionSpecifier region =
+        ProtobufUtil.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env
+            .getRegionInfo().getRegionName());
+    bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken())
+        .setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum())
+        .addAllFamilyPath(request.getFamilyPathList());
+    return bulkLoadHFileRequest.build();
+  }
+
+  @Override
+  public Service getService() {
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/Aggregate.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/Aggregate.proto b/hbase-endpoint/src/main/protobuf/Aggregate.proto
new file mode 100644
index 0000000..4d32e70
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/Aggregate.proto
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "AggregateProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "Client.proto";
+
+message AggregateRequest {
+  /** The request passed to the AggregateService consists of three parts
+   *  (1) the (canonical) classname of the ColumnInterpreter implementation
+   *  (2) the Scan query
+   *  (3) any bytes required to construct the ColumnInterpreter object
+   *      properly
+   */
+  required string interpreter_class_name = 1;
+  required Scan scan = 2;
+  optional bytes  interpreter_specific_bytes = 3;
+}
+
+message AggregateResponse {
+  /**
+   * The AggregateService methods all have a response that either is a Pair
+   * or a simple object. When it is a Pair both first_part and second_part
+   * have defined values (and the second_part is not present in the response
+   * when the response is not a pair). Refer to the AggregateImplementation 
+   * class for an overview of the AggregateResponse object constructions. 
+   */ 
+  repeated bytes first_part = 1;
+  optional bytes second_part = 2;
+}
+
+/** Refer to the AggregateImplementation class for an overview of the 
+ *  AggregateService method implementations and their functionality.
+ */
+service AggregateService {
+  rpc GetMax (AggregateRequest) returns (AggregateResponse);
+  rpc GetMin (AggregateRequest) returns (AggregateResponse);
+  rpc GetSum (AggregateRequest) returns (AggregateResponse);
+  rpc GetRowNum (AggregateRequest) returns (AggregateResponse);
+  rpc GetAvg (AggregateRequest) returns (AggregateResponse);
+  rpc GetStd (AggregateRequest) returns (AggregateResponse);
+  rpc GetMedian (AggregateRequest) returns (AggregateResponse);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/BulkDelete.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/BulkDelete.proto b/hbase-endpoint/src/main/protobuf/BulkDelete.proto
new file mode 100644
index 0000000..c2ec8ca
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/BulkDelete.proto
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.coprocessor.example.generated";
+option java_outer_classname = "BulkDeleteProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "Client.proto";
+
+message BulkDeleteRequest {
+  required Scan scan = 1;
+  required DeleteType deleteType = 2;
+  optional uint64 timestamp = 3;
+  required uint32 rowBatchSize = 4;
+  
+  enum DeleteType {
+    ROW = 0;
+    FAMILY = 1;
+    COLUMN = 2;
+    VERSION = 3;
+  }
+}
+
+message BulkDeleteResponse {
+  required uint64 rowsDeleted = 1; 
+  optional uint64 versionsDeleted = 2;
+}
+
+service BulkDeleteService {
+  rpc delete(BulkDeleteRequest)
+    returns (BulkDeleteResponse);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto b/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto
new file mode 100644
index 0000000..b4dc01e
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Coprocessor test
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "ColumnAggregationWithNullResponseProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+// use unique names for messages in ColumnAggregationXXX.protos due to a bug in
+// protoc or hadoop's protoc compiler.
+message ColumnAggregationNullResponseSumRequest {
+  required bytes family = 1;
+  optional bytes qualifier = 2;
+}
+
+message ColumnAggregationNullResponseSumResponse {
+  optional int64 sum = 1;
+}
+
+service ColumnAggregationServiceNullResponse {
+  rpc sum(ColumnAggregationNullResponseSumRequest)
+    returns(ColumnAggregationNullResponseSumResponse);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto b/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto
new file mode 100644
index 0000000..ad1acda
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Coprocessor test
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "ColumnAggregationProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message SumRequest {
+  required bytes family = 1;
+  optional bytes qualifier = 2;
+}
+
+message SumResponse {
+  required int64 sum = 1;
+}
+
+service ColumnAggregationService {
+  rpc sum(SumRequest) returns(SumResponse);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto b/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto
new file mode 100644
index 0000000..7808949
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Coprocessor test
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "ColumnAggregationWithErrorsProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+// use unique names for messages in ColumnAggregationXXX.protos due to a bug in
+// protoc or hadoop's protoc compiler.
+message ColumnAggregationWithErrorsSumRequest {
+  required bytes family = 1;
+  optional bytes qualifier = 2;
+}
+
+message ColumnAggregationWithErrorsSumResponse {
+  required int64 sum = 1;
+}
+
+service ColumnAggregationServiceWithErrors {
+  rpc sum(ColumnAggregationWithErrorsSumRequest)
+    returns(ColumnAggregationWithErrorsSumResponse);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto b/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto
new file mode 100644
index 0000000..539f7da
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hbase.test.pb;
+
+// Coprocessor test
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "DummyRegionServerEndpointProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message DummyRequest {
+}
+
+message DummyResponse {
+  required string value = 1;
+}
+
+service DummyService {
+  rpc dummyCall(DummyRequest) returns(DummyResponse);
+  rpc dummyThrow(DummyRequest) returns(DummyResponse);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto b/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto
new file mode 100644
index 0000000..b8c77ca
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "IncrementCounterProcessorTestProtos";
+option java_generate_equals_and_hash = true;
+
+message IncCounterProcessorRequest {
+  required bytes row = 1;
+  required int32 counter = 2;
+}
+
+message IncCounterProcessorResponse {
+  required int32 response = 1;
+}
+
+message FriendsOfFriendsProcessorRequest {
+  required bytes person = 1;
+  required bytes row = 2;
+  repeated string result = 3;
+}
+
+message FriendsOfFriendsProcessorResponse {
+  repeated string result = 1;
+}
+
+message RowSwapProcessorRequest {
+  required bytes row1 = 1;
+  required bytes row2 = 2;
+}
+
+message RowSwapProcessorResponse {
+}
+
+message TimeoutProcessorRequest {
+  required bytes row = 1;
+}
+
+message TimeoutProcessorResponse {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto b/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto
new file mode 100644
index 0000000..d86d162
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "SecureBulkLoadProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import 'Client.proto';
+
+message SecureBulkLoadHFilesRequest {
+  repeated BulkLoadHFileRequest.FamilyPath family_path = 1;
+  optional bool assign_seq_num = 2;
+  required DelegationToken fs_token = 3;
+  required string bulk_token = 4;
+}
+
+message SecureBulkLoadHFilesResponse {
+  required bool loaded = 1;
+}
+
+service SecureBulkLoadService {
+    rpc PrepareBulkLoad(PrepareBulkLoadRequest)
+      returns (PrepareBulkLoadResponse);
+
+    rpc SecureBulkLoadHFiles(SecureBulkLoadHFilesRequest)
+      returns (SecureBulkLoadHFilesResponse);
+
+    rpc CleanupBulkLoad(CleanupBulkLoadRequest)
+      returns (CleanupBulkLoadResponse);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
new file mode 100644
index 0000000..aac020d
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
+import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class, ClientTests.class})
+public class TestRpcControllerFactory {
+
+  public static class StaticRpcControllerFactory extends RpcControllerFactory {
+
+    public StaticRpcControllerFactory(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    public HBaseRpcController newController() {
+      return new CountingRpcController(super.newController());
+    }
+
+    @Override
+    public HBaseRpcController newController(final CellScanner cellScanner) {
+      return new CountingRpcController(super.newController(cellScanner));
+    }
+
+    @Override
+    public HBaseRpcController newController(final List<CellScannable> cellIterables) {
+      return new CountingRpcController(super.newController(cellIterables));
+    }
+  }
+
+  public static class CountingRpcController extends DelegatingHBaseRpcController {
+
+    private static AtomicInteger INT_PRIORITY = new AtomicInteger();
+    private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
+
+    public CountingRpcController(HBaseRpcController delegate) {
+      super(delegate);
+    }
+
+    @Override
+    public void setPriority(int priority) {
+      super.setPriority(priority);
+      INT_PRIORITY.incrementAndGet();
+    }
+
+    @Override
+    public void setPriority(TableName tn) {
+      super.setPriority(tn);
+      // ignore counts for system tables - it could change and we really only want to check on what
+      // the client should change
+      if (!tn.isSystemTable()) {
+        TABLE_PRIORITY.incrementAndGet();
+      }
+
+    }
+  }
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    // load an endpoint so we have an endpoint to test - it doesn't matter which one, but
+    // this is already in tests, so we can just use it.
+    Configuration conf = UTIL.getConfiguration();
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+      ProtobufCoprocessorService.class.getName());
+
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void teardown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * check some of the methods and make sure we are incrementing each time. Its a bit tediuous to
+   * cover all methods here and really is a bit brittle since we can always add new methods but
+   * won't be sure to add them here. So we just can cover the major ones.
+   * @throws Exception on failure
+   */
+  @Test
+  public void testCountController() throws Exception {
+    Configuration conf = new Configuration(UTIL.getConfiguration());
+    // setup our custom controller
+    conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+      StaticRpcControllerFactory.class.getName());
+
+    TableName name = TableName.valueOf("testcustomcontroller");
+    UTIL.createTable(name, fam1).close();
+
+    // change one of the connection properties so we get a new Connection with our configuration
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
+
+    Connection connection = ConnectionFactory.createConnection(conf);
+    Table table = connection.getTable(name);
+    byte[] row = Bytes.toBytes("row");
+    Put p = new Put(row);
+    p.addColumn(fam1, fam1, Bytes.toBytes("val0"));
+    table.put(p);
+
+    Integer counter = 1;
+    counter = verifyCount(counter);
+
+    Delete d = new Delete(row);
+    d.addColumn(fam1, fam1);
+    table.delete(d);
+    counter = verifyCount(counter);
+
+    Put p2 = new Put(row);
+    p2.addColumn(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
+    table.batch(Lists.newArrayList(p, p2), null);
+    // this only goes to a single server, so we don't need to change the count here
+    counter = verifyCount(counter);
+
+    Append append = new Append(row);
+    append.add(fam1, fam1, Bytes.toBytes("val2"));
+    table.append(append);
+    counter = verifyCount(counter);
+
+    // and check the major lookup calls as well
+    Get g = new Get(row);
+    table.get(g);
+    counter = verifyCount(counter);
+
+    ResultScanner scan = table.getScanner(fam1);
+    scan.next();
+    scan.close();
+    counter = verifyCount(counter + 2);
+
+    Get g2 = new Get(row);
+    table.get(Lists.newArrayList(g, g2));
+    // same server, so same as above for not changing count
+    counter = verifyCount(counter);
+
+    // make sure all the scanner types are covered
+    Scan scanInfo = new Scan(row);
+    // regular small
+    scanInfo.setSmall(true);
+    counter = doScan(table, scanInfo, counter);
+
+    // reversed, small
+    scanInfo.setReversed(true);
+    counter = doScan(table, scanInfo, counter);
+
+    // reversed, regular
+    scanInfo.setSmall(false);
+    counter = doScan(table, scanInfo, counter + 2);
+
+    table.close();
+    connection.close();
+  }
+
+  int doScan(Table table, Scan scan, int expectedCount) throws IOException {
+    ResultScanner results = table.getScanner(scan);
+    results.next();
+    results.close();
+    return verifyCount(expectedCount);
+  }
+
+  int verifyCount(Integer counter) {
+    assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter.intValue());
+    assertEquals(0, CountingRpcController.INT_PRIORITY.get());
+    return CountingRpcController.TABLE_PRIORITY.get() + 1;
+  }
+
+  @Test
+  public void testFallbackToDefaultRpcControllerFactory() {
+    Configuration conf = new Configuration(UTIL.getConfiguration());
+    conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, "foo.bar.Baz");
+
+    // Should not fail
+    RpcControllerFactory factory = RpcControllerFactory.instantiate(conf);
+    assertNotNull(factory);
+    assertEquals(factory.getClass(), RpcControllerFactory.class);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
new file mode 100644
index 0000000..a9d10e8
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+
+/**
+ * The aggregation implementation at a region.
+ */
+public class ColumnAggregationEndpoint extends ColumnAggregationService
+implements Coprocessor, CoprocessorService {
+  private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpoint.class);
+  private RegionCoprocessorEnvironment env = null;
+
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    if (env instanceof RegionCoprocessorEnvironment) {
+      this.env = (RegionCoprocessorEnvironment)env;
+      return;
+    }
+    throw new CoprocessorException("Must be loaded on a table region!");
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+    // Nothing to do.
+  }
+
+  @Override
+  public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
+    // aggregate at each region
+    Scan scan = new Scan();
+    // Family is required in pb. Qualifier is not.
+    byte [] family = request.getFamily().toByteArray();
+    byte [] qualifier = request.hasQualifier()? request.getQualifier().toByteArray(): null;
+    if (request.hasQualifier()) {
+      scan.addColumn(family, qualifier);
+    } else {
+      scan.addFamily(family);
+    }
+    int sumResult = 0;
+    InternalScanner scanner = null;
+    try {
+      scanner = this.env.getRegion().getScanner(scan);
+      List<Cell> curVals = new ArrayList<Cell>();
+      boolean hasMore = false;
+      do {
+        curVals.clear();
+        hasMore = scanner.next(curVals);
+        for (Cell kv : curVals) {
+          if (CellUtil.matchingQualifier(kv, qualifier)) {
+            sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
+          }
+        }
+      } while (hasMore);
+    } catch (IOException e) {
+      CoprocessorRpcUtils.setControllerException(controller, e);
+      // Set result to -1 to indicate error.
+      sumResult = -1;
+      LOG.info("Setting sum result to -1 to indicate error", e);
+    } finally {
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException e) {
+          CoprocessorRpcUtils.setControllerException(controller, e);
+          sumResult = -1;
+          LOG.info("Setting sum result to -1 to indicate error", e);
+        }
+      }
+    }
+    LOG.info("Returning result " + sumResult);
+    done.run(SumResponse.newBuilder().setSum(sumResult).build());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
new file mode 100644
index 0000000..22dac6d
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+/**
+ * Test coprocessor endpoint that always returns {@code null} for requests to the last region
+ * in the table.  This allows tests to provide assurance of correct {@code null} handling for
+ * response values.
+ */
+public class ColumnAggregationEndpointNullResponse
+    extends
+    ColumnAggregationServiceNullResponse
+implements Coprocessor, CoprocessorService  {
+  private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointNullResponse.class);
+  private RegionCoprocessorEnvironment env = null;
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    if (env instanceof RegionCoprocessorEnvironment) {
+      this.env = (RegionCoprocessorEnvironment)env;
+      return;
+    }
+    throw new CoprocessorException("Must be loaded on a table region!");
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+    // Nothing to do.
+  }
+
+  @Override
+  public void sum(RpcController controller, ColumnAggregationNullResponseSumRequest request,
+       RpcCallback<ColumnAggregationNullResponseSumResponse> done) {
+    // aggregate at each region
+    Scan scan = new Scan();
+    // Family is required in pb. Qualifier is not.
+    byte[] family = request.getFamily().toByteArray();
+    byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
+    if (request.hasQualifier()) {
+      scan.addColumn(family, qualifier);
+    } else {
+      scan.addFamily(family);
+    }
+    int sumResult = 0;
+    InternalScanner scanner = null;
+    try {
+      Region region = this.env.getRegion();
+      // for the last region in the table, return null to test null handling
+      if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
+        done.run(null);
+        return;
+      }
+      scanner = region.getScanner(scan);
+      List<Cell> curVals = new ArrayList<Cell>();
+      boolean hasMore = false;
+      do {
+        curVals.clear();
+        hasMore = scanner.next(curVals);
+        for (Cell kv : curVals) {
+          if (CellUtil.matchingQualifier(kv, qualifier)) {
+            sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
+          }
+        }
+      } while (hasMore);
+    } catch (IOException e) {
+      CoprocessorRpcUtils.setControllerException(controller, e);
+      // Set result to -1 to indicate error.
+      sumResult = -1;
+      LOG.info("Setting sum result to -1 to indicate error", e);
+    } finally {
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException e) {
+          CoprocessorRpcUtils.setControllerException(controller, e);
+          sumResult = -1;
+          LOG.info("Setting sum result to -1 to indicate error", e);
+        }
+      }
+    }
+    done.run(ColumnAggregationNullResponseSumResponse.newBuilder().setSum(sumResult)
+      .build());
+    LOG.info("Returning sum " + sumResult + " for region " +
+        Bytes.toStringBinary(env.getRegion().getRegionInfo().getRegionName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
new file mode 100644
index 0000000..c75fb31
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+/**
+ * Test coprocessor endpoint that always throws a {@link DoNotRetryIOException} for requests on
+ * the last region in the table.  This allows tests to ensure correct error handling of
+ * coprocessor endpoints throwing exceptions.
+ */
+public class ColumnAggregationEndpointWithErrors
+    extends
+    ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
+implements Coprocessor, CoprocessorService  {
+  private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointWithErrors.class);
+  private RegionCoprocessorEnvironment env = null;
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    if (env instanceof RegionCoprocessorEnvironment) {
+      this.env = (RegionCoprocessorEnvironment)env;
+      return;
+    }
+    throw new CoprocessorException("Must be loaded on a table region!");
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+    // Nothing to do.
+  }
+
+  @Override
+  public void sum(RpcController controller, ColumnAggregationWithErrorsSumRequest request, 
+      RpcCallback<ColumnAggregationWithErrorsSumResponse> done) {
+    // aggregate at each region
+    Scan scan = new Scan();
+    // Family is required in pb. Qualifier is not.
+    byte[] family = request.getFamily().toByteArray();
+    byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
+    if (request.hasQualifier()) {
+      scan.addColumn(family, qualifier);
+    } else {
+      scan.addFamily(family);
+    }
+    int sumResult = 0;
+    InternalScanner scanner = null;
+    try {
+      Region region = this.env.getRegion();
+      // throw an exception for requests to the last region in the table, to test error handling
+      if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
+        throw new DoNotRetryIOException("An expected exception");
+      }
+      scanner = region.getScanner(scan);
+      List<Cell> curVals = new ArrayList<Cell>();
+      boolean hasMore = false;
+      do {
+        curVals.clear();
+        hasMore = scanner.next(curVals);
+        for (Cell kv : curVals) {
+          if (CellUtil.matchingQualifier(kv, qualifier)) {
+            sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
+          }
+        }
+      } while (hasMore);
+    } catch (IOException e) {
+      CoprocessorRpcUtils.setControllerException(controller, e);
+      // Set result to -1 to indicate error.
+      sumResult = -1;
+      LOG.info("Setting sum result to -1 to indicate error", e);
+    } finally {
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException e) {
+          CoprocessorRpcUtils.setControllerException(controller, e);
+          sumResult = -1;
+          LOG.info("Setting sum result to -1 to indicate error", e);
+        }
+      }
+    }
+    done.run(ColumnAggregationWithErrorsSumResponse.newBuilder().setSum(sumResult).build());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
new file mode 100644
index 0000000..5b7c1e9
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.util.Threads;
+
+import java.io.IOException;
+
+/**
+ * Test implementation of a coprocessor endpoint exposing the
+ * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by unit tests
+ * only.
+ */
+public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto
+    implements CoprocessorService, Coprocessor {
+  public ProtobufCoprocessorService() {
+  }
+
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  @Override
+  public void ping(RpcController controller, TestProtos.EmptyRequestProto request,
+      RpcCallback<TestProtos.EmptyResponseProto> done) {
+    done.run(TestProtos.EmptyResponseProto.getDefaultInstance());
+  }
+
+  @Override
+  public void echo(RpcController controller, TestProtos.EchoRequestProto request,
+      RpcCallback<TestProtos.EchoResponseProto> done) {
+    String message = request.getMessage();
+    done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build());
+  }
+
+  @Override
+  public void error(RpcController controller, TestProtos.EmptyRequestProto request,
+      RpcCallback<TestProtos.EmptyResponseProto> done) {
+    CoprocessorRpcUtils.setControllerException(controller, new IOException("Test exception"));
+    done.run(null);
+  }
+
+  @Override
+  public void pause(RpcController controller, PauseRequestProto request,
+      RpcCallback<EmptyResponseProto> done) {
+    Threads.sleepWithoutInterrupt(request.getMs());
+    done.run(EmptyResponseProto.getDefaultInstance());
+  }
+
+  @Override
+  public void addr(RpcController controller, EmptyRequestProto request,
+      RpcCallback<AddrResponseProto> done) {
+    done.run(AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress())
+        .build());
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    // To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+    // To change body of implemented methods use File | Settings | File Templates.
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java
new file mode 100644
index 0000000..c023437
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
+/**
+ * TestEndpoint: test cases to verify the batch execution of coprocessor Endpoint
+ */
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestBatchCoprocessorEndpoint {
+  private static final Log LOG = LogFactory.getLog(TestBatchCoprocessorEndpoint.class);
+
+  private static final TableName TEST_TABLE =
+      TableName.valueOf("TestTable");
+  private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
+  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
+  private static byte[] ROW = Bytes.toBytes("testRow");
+
+  private static final int ROWSIZE = 20;
+  private static final int rowSeperator1 = 5;
+  private static final int rowSeperator2 = 12;
+  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+  private static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // set configure to indicate which cp should be loaded
+    Configuration conf = util.getConfiguration();
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
+        ProtobufCoprocessorService.class.getName(),
+        ColumnAggregationEndpointWithErrors.class.getName(),
+        ColumnAggregationEndpointNullResponse.class.getName());
+    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+        ProtobufCoprocessorService.class.getName());
+    util.startMiniCluster(2);
+    Admin admin = util.getHBaseAdmin();
+    HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
+    desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+    admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
+    util.waitUntilAllRegionsAssigned(TEST_TABLE);
+    admin.close();
+
+    Table table = util.getConnection().getTable(TEST_TABLE);
+    for (int i = 0; i < ROWSIZE; i++) {
+      Put put = new Put(ROWS[i]);
+      put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
+      table.put(put);
+    }
+    table.close();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testAggregationNullResponse() throws Throwable {
+    Table table = util.getConnection().getTable(TEST_TABLE);
+    ColumnAggregationNullResponseSumRequest.Builder builder =
+        ColumnAggregationNullResponseSumRequest
+        .newBuilder();
+    builder.setFamily(ByteString.copyFrom(TEST_FAMILY));
+    if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
+      builder.setQualifier(ByteString.copyFrom(TEST_QUALIFIER));
+    }
+    Map<byte[], ColumnAggregationNullResponseSumResponse> results =
+        table.batchCoprocessorService(
+            ColumnAggregationServiceNullResponse.getDescriptor().findMethodByName("sum"),
+            builder.build(), ROWS[0], ROWS[ROWS.length - 1],
+            ColumnAggregationNullResponseSumResponse.getDefaultInstance());
+
+    int sumResult = 0;
+    int expectedResult = 0;
+    for (Map.Entry<byte[], ColumnAggregationNullResponseSumResponse> e :
+        results.entrySet()) {
+      LOG.info("Got value " + e.getValue().getSum() + " for region "
+          + Bytes.toStringBinary(e.getKey()));
+      sumResult += e.getValue().getSum();
+    }
+    for (int i = 0; i < rowSeperator2; i++) {
+      expectedResult += i;
+    }
+    assertEquals("Invalid result", expectedResult, sumResult);
+    table.close();
+  }
+
+  private static byte[][] makeN(byte[] base, int n) {
+    byte[][] ret = new byte[n][];
+    for (int i = 0; i < n; i++) {
+      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
+    }
+    return ret;
+  }
+
+  private Map<byte[], SumResponse> sum(final Table table, final byte[] family,
+      final byte[] qualifier, final byte[] start, final byte[] end) throws ServiceException,
+      Throwable {
+    ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest
+        .newBuilder();
+    builder.setFamily(ByteString.copyFrom(family));
+    if (qualifier != null && qualifier.length > 0) {
+      builder.setQualifier(ByteString.copyFrom(qualifier));
+    }
+    return table.batchCoprocessorService(
+        ColumnAggregationProtos.ColumnAggregationService.getDescriptor().findMethodByName("sum"),
+        builder.build(), start, end, ColumnAggregationProtos.SumResponse.getDefaultInstance());
+  }
+
+  @Test
+  public void testAggregationWithReturnValue() throws Throwable {
+    Table table = util.getConnection().getTable(TEST_TABLE);
+    Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
+        ROWS[ROWS.length - 1]);
+    int sumResult = 0;
+    int expectedResult = 0;
+    for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
+      LOG.info("Got value " + e.getValue().getSum() + " for region "
+          + Bytes.toStringBinary(e.getKey()));
+      sumResult += e.getValue().getSum();
+    }
+    for (int i = 0; i < ROWSIZE; i++) {
+      expectedResult += i;
+    }
+    assertEquals("Invalid result", expectedResult, sumResult);
+
+    results.clear();
+
+    // scan: for region 2 and region 3
+    results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1],
+        ROWS[ROWS.length - 1]);
+    sumResult = 0;
+    expectedResult = 0;
+    for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
+      LOG.info("Got value " + e.getValue().getSum() + " for region "
+          + Bytes.toStringBinary(e.getKey()));
+      sumResult += e.getValue().getSum();
+    }
+    for (int i = rowSeperator1; i < ROWSIZE; i++) {
+      expectedResult += i;
+    }
+    assertEquals("Invalid result", expectedResult, sumResult);
+    table.close();
+  }
+
+  @Test
+  public void testAggregation() throws Throwable {
+    Table table = util.getConnection().getTable(TEST_TABLE);
+    Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
+        ROWS[0], ROWS[ROWS.length - 1]);
+    int sumResult = 0;
+    int expectedResult = 0;
+    for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
+      LOG.info("Got value " + e.getValue().getSum() + " for region "
+          + Bytes.toStringBinary(e.getKey()));
+      sumResult += e.getValue().getSum();
+    }
+    for (int i = 0; i < ROWSIZE; i++) {
+      expectedResult += i;
+    }
+    assertEquals("Invalid result", expectedResult, sumResult);
+
+    // scan: for region 2 and region 3
+    results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length - 1]);
+    sumResult = 0;
+    expectedResult = 0;
+    for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
+      LOG.info("Got value " + e.getValue().getSum() + " for region "
+          + Bytes.toStringBinary(e.getKey()));
+      sumResult += e.getValue().getSum();
+    }
+    for (int i = rowSeperator1; i < ROWSIZE; i++) {
+      expectedResult += i;
+    }
+    assertEquals("Invalid result", expectedResult, sumResult);
+    table.close();
+  }
+
+  @Test
+  public void testAggregationWithErrors() throws Throwable {
+    Table table = util.getConnection().getTable(TEST_TABLE);
+    final Map<byte[], ColumnAggregationWithErrorsSumResponse> results =
+        Collections.synchronizedMap(
+            new TreeMap<byte[], ColumnAggregationWithErrorsSumResponse>(
+                Bytes.BYTES_COMPARATOR
+            ));
+    ColumnAggregationWithErrorsSumRequest.Builder builder =
+        ColumnAggregationWithErrorsSumRequest
+        .newBuilder();
+    builder.setFamily(ByteString.copyFrom(TEST_FAMILY));
+    if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
+      builder.setQualifier(ByteString.copyFrom(TEST_QUALIFIER));
+    }
+
+    boolean hasError = false;
+    try {
+      table.batchCoprocessorService(
+          ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors.getDescriptor()
+              .findMethodByName("sum"),
+          builder.build(), ROWS[0], ROWS[ROWS.length - 1],
+          ColumnAggregationWithErrorsSumResponse.getDefaultInstance(),
+          new Batch.Callback<ColumnAggregationWithErrorsSumResponse>() {
+
+            @Override
+            public void update(byte[] region, byte[] row,
+                ColumnAggregationWithErrorsSumResponse result) {
+              results.put(region, result);
+            }
+          });
+    } catch (Throwable t) {
+      LOG.info("Exceptions in coprocessor service", t);
+      hasError = true;
+    }
+
+    int sumResult = 0;
+    int expectedResult = 0;
+    for (Map.Entry<byte[], ColumnAggregationWithErrorsSumResponse> e : results.entrySet()) {
+      LOG.info("Got value " + e.getValue().getSum() + " for region "
+          + Bytes.toStringBinary(e.getKey()));
+      sumResult += e.getValue().getSum();
+    }
+    for (int i = 0; i < rowSeperator2; i++) {
+      expectedResult += i;
+    }
+    assertEquals("Invalid result", expectedResult, sumResult);
+    assertTrue(hasError);
+    table.close();
+  }
+}