You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/10/07 19:14:59 UTC
[31/77] [abbrv] [partial] hbase git commit: HBASE-15638 Shade
protobuf Which includes
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
new file mode 100644
index 0000000..1849d90
--- /dev/null
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+/**
+ * Coprocessor service for bulk loads in secure mode.
+ * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
+ */
+@InterfaceAudience.Private
+@Deprecated
+public class SecureBulkLoadEndpoint extends SecureBulkLoadService
+ implements CoprocessorService, Coprocessor {
+
+ public static final long VERSION = 0L;
+
+ private static final Log LOG = LogFactory.getLog(SecureBulkLoadEndpoint.class);
+
+ private RegionCoprocessorEnvironment env;
+
+ @Override
+ public void start(CoprocessorEnvironment env) {
+ this.env = (RegionCoprocessorEnvironment)env;
+ LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
+ LOG.warn("Secure bulk load has been integrated into HBase core.");
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ }
+
+ @Override
+ public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request,
+ RpcCallback<PrepareBulkLoadResponse> done) {
+ try {
+ SecureBulkLoadManager secureBulkLoadManager =
+ this.env.getRegionServerServices().getSecureBulkLoadManager();
+ String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(),
+ convert(request));
+ done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ }
+ done.run(null);
+ }
+
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest
+ convert(PrepareBulkLoadRequest request)
+ throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException {
+ byte [] bytes = request.toByteArray();
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.Builder
+ builder =
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.
+ newBuilder();
+ builder.mergeFrom(bytes);
+ return builder.build();
+ }
+
+ @Override
+ public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request,
+ RpcCallback<CleanupBulkLoadResponse> done) {
+ try {
+ SecureBulkLoadManager secureBulkLoadManager =
+ this.env.getRegionServerServices().getSecureBulkLoadManager();
+ secureBulkLoadManager.cleanupBulkLoad(this.env.getRegion(), convert(request));
+ done.run(CleanupBulkLoadResponse.newBuilder().build());
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ }
+ done.run(null);
+ }
+
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest
+ convert(CleanupBulkLoadRequest request)
+ throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException {
+ byte [] bytes = request.toByteArray();
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.Builder
+ builder =
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.
+ newBuilder();
+ builder.mergeFrom(bytes);
+ return builder.build();
+ }
+
+ @Override
+ public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request,
+ RpcCallback<SecureBulkLoadHFilesResponse> done) {
+ boolean loaded = false;
+ try {
+ SecureBulkLoadManager secureBulkLoadManager =
+ this.env.getRegionServerServices().getSecureBulkLoadManager();
+ BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request);
+ loaded = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(),
+ convert(bulkLoadHFileRequest));
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ }
+ done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
+ }
+
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest
+ convert(BulkLoadHFileRequest request)
+ throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException {
+ byte [] bytes = request.toByteArray();
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder
+ builder =
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.
+ newBuilder();
+ builder.mergeFrom(bytes);
+ return builder.build();
+ }
+
+ private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest(
+ SecureBulkLoadHFilesRequest request) {
+ BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder();
+ RegionSpecifier region =
+ ProtobufUtil.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env
+ .getRegionInfo().getRegionName());
+ bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken())
+ .setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum())
+ .addAllFamilyPath(request.getFamilyPathList());
+ return bulkLoadHFileRequest.build();
+ }
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/Aggregate.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/Aggregate.proto b/hbase-endpoint/src/main/protobuf/Aggregate.proto
new file mode 100644
index 0000000..4d32e70
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/Aggregate.proto
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "AggregateProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "Client.proto";
+
+message AggregateRequest {
+ /** The request passed to the AggregateService consists of three parts
+ * (1) the (canonical) classname of the ColumnInterpreter implementation
+ * (2) the Scan query
+ * (3) any bytes required to construct the ColumnInterpreter object
+ * properly
+ */
+ required string interpreter_class_name = 1;
+ required Scan scan = 2;
+ optional bytes interpreter_specific_bytes = 3;
+}
+
+message AggregateResponse {
+ /**
+ * The AggregateService methods all have a response that either is a Pair
+ * or a simple object. When it is a Pair both first_part and second_part
+ * have defined values (and the second_part is not present in the response
+ * when the response is not a pair). Refer to the AggregateImplementation
+ * class for an overview of the AggregateResponse object constructions.
+ */
+ repeated bytes first_part = 1;
+ optional bytes second_part = 2;
+}
+
+/** Refer to the AggregateImplementation class for an overview of the
+ * AggregateService method implementations and their functionality.
+ */
+service AggregateService {
+ rpc GetMax (AggregateRequest) returns (AggregateResponse);
+ rpc GetMin (AggregateRequest) returns (AggregateResponse);
+ rpc GetSum (AggregateRequest) returns (AggregateResponse);
+ rpc GetRowNum (AggregateRequest) returns (AggregateResponse);
+ rpc GetAvg (AggregateRequest) returns (AggregateResponse);
+ rpc GetStd (AggregateRequest) returns (AggregateResponse);
+ rpc GetMedian (AggregateRequest) returns (AggregateResponse);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/BulkDelete.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/BulkDelete.proto b/hbase-endpoint/src/main/protobuf/BulkDelete.proto
new file mode 100644
index 0000000..c2ec8ca
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/BulkDelete.proto
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.coprocessor.example.generated";
+option java_outer_classname = "BulkDeleteProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "Client.proto";
+
+message BulkDeleteRequest {
+ required Scan scan = 1;
+ required DeleteType deleteType = 2;
+ optional uint64 timestamp = 3;
+ required uint32 rowBatchSize = 4;
+
+ enum DeleteType {
+ ROW = 0;
+ FAMILY = 1;
+ COLUMN = 2;
+ VERSION = 3;
+ }
+}
+
+message BulkDeleteResponse {
+ required uint64 rowsDeleted = 1;
+ optional uint64 versionsDeleted = 2;
+}
+
+service BulkDeleteService {
+ rpc delete(BulkDeleteRequest)
+ returns (BulkDeleteResponse);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto b/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto
new file mode 100644
index 0000000..b4dc01e
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Coprocessor test
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "ColumnAggregationWithNullResponseProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+// use unique names for messages in ColumnAggregationXXX.protos due to a bug in
+// protoc or hadoop's protoc compiler.
+message ColumnAggregationNullResponseSumRequest {
+ required bytes family = 1;
+ optional bytes qualifier = 2;
+}
+
+message ColumnAggregationNullResponseSumResponse {
+ optional int64 sum = 1;
+}
+
+service ColumnAggregationServiceNullResponse {
+ rpc sum(ColumnAggregationNullResponseSumRequest)
+ returns(ColumnAggregationNullResponseSumResponse);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto b/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto
new file mode 100644
index 0000000..ad1acda
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Coprocessor test
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "ColumnAggregationProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message SumRequest {
+ required bytes family = 1;
+ optional bytes qualifier = 2;
+}
+
+message SumResponse {
+ required int64 sum = 1;
+}
+
+service ColumnAggregationService {
+ rpc sum(SumRequest) returns(SumResponse);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto b/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto
new file mode 100644
index 0000000..7808949
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Coprocessor test
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "ColumnAggregationWithErrorsProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+// use unique names for messages in ColumnAggregationXXX.protos due to a bug in
+// protoc or hadoop's protoc compiler.
+message ColumnAggregationWithErrorsSumRequest {
+ required bytes family = 1;
+ optional bytes qualifier = 2;
+}
+
+message ColumnAggregationWithErrorsSumResponse {
+ required int64 sum = 1;
+}
+
+service ColumnAggregationServiceWithErrors {
+ rpc sum(ColumnAggregationWithErrorsSumRequest)
+ returns(ColumnAggregationWithErrorsSumResponse);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto b/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto
new file mode 100644
index 0000000..539f7da
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hbase.test.pb;
+
+// Coprocessor test
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "DummyRegionServerEndpointProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message DummyRequest {
+}
+
+message DummyResponse {
+ required string value = 1;
+}
+
+service DummyService {
+ rpc dummyCall(DummyRequest) returns(DummyResponse);
+ rpc dummyThrow(DummyRequest) returns(DummyResponse);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto b/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto
new file mode 100644
index 0000000..b8c77ca
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "IncrementCounterProcessorTestProtos";
+option java_generate_equals_and_hash = true;
+
+message IncCounterProcessorRequest {
+ required bytes row = 1;
+ required int32 counter = 2;
+}
+
+message IncCounterProcessorResponse {
+ required int32 response = 1;
+}
+
+message FriendsOfFriendsProcessorRequest {
+ required bytes person = 1;
+ required bytes row = 2;
+ repeated string result = 3;
+}
+
+message FriendsOfFriendsProcessorResponse {
+ repeated string result = 1;
+}
+
+message RowSwapProcessorRequest {
+ required bytes row1 = 1;
+ required bytes row2 = 2;
+}
+
+message RowSwapProcessorResponse {
+}
+
+message TimeoutProcessorRequest {
+ required bytes row = 1;
+}
+
+message TimeoutProcessorResponse {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto b/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto
new file mode 100644
index 0000000..d86d162
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "SecureBulkLoadProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import 'Client.proto';
+
+message SecureBulkLoadHFilesRequest {
+ repeated BulkLoadHFileRequest.FamilyPath family_path = 1;
+ optional bool assign_seq_num = 2;
+ required DelegationToken fs_token = 3;
+ required string bulk_token = 4;
+}
+
+message SecureBulkLoadHFilesResponse {
+ required bool loaded = 1;
+}
+
+service SecureBulkLoadService {
+ rpc PrepareBulkLoad(PrepareBulkLoadRequest)
+ returns (PrepareBulkLoadResponse);
+
+ rpc SecureBulkLoadHFiles(SecureBulkLoadHFilesRequest)
+ returns (SecureBulkLoadHFilesResponse);
+
+ rpc CleanupBulkLoad(CleanupBulkLoadRequest)
+ returns (CleanupBulkLoadResponse);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
new file mode 100644
index 0000000..aac020d
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
+import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class, ClientTests.class})
+public class TestRpcControllerFactory {
+
+ public static class StaticRpcControllerFactory extends RpcControllerFactory {
+
+ public StaticRpcControllerFactory(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public HBaseRpcController newController() {
+ return new CountingRpcController(super.newController());
+ }
+
+ @Override
+ public HBaseRpcController newController(final CellScanner cellScanner) {
+ return new CountingRpcController(super.newController(cellScanner));
+ }
+
+ @Override
+ public HBaseRpcController newController(final List<CellScannable> cellIterables) {
+ return new CountingRpcController(super.newController(cellIterables));
+ }
+ }
+
+ public static class CountingRpcController extends DelegatingHBaseRpcController {
+
+ private static AtomicInteger INT_PRIORITY = new AtomicInteger();
+ private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
+
+ public CountingRpcController(HBaseRpcController delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public void setPriority(int priority) {
+ super.setPriority(priority);
+ INT_PRIORITY.incrementAndGet();
+ }
+
+ @Override
+ public void setPriority(TableName tn) {
+ super.setPriority(tn);
+ // ignore counts for system tables - it could change and we really only want to check on what
+ // the client should change
+ if (!tn.isSystemTable()) {
+ TABLE_PRIORITY.incrementAndGet();
+ }
+
+ }
+ }
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // load an endpoint so we have an endpoint to test - it doesn't matter which one, but
+ // this is already in tests, so we can just use it.
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ ProtobufCoprocessorService.class.getName());
+
+ UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * check some of the methods and make sure we are incrementing each time. Its a bit tediuous to
+ * cover all methods here and really is a bit brittle since we can always add new methods but
+ * won't be sure to add them here. So we just can cover the major ones.
+ * @throws Exception on failure
+ */
+ @Test
+ public void testCountController() throws Exception {
+ Configuration conf = new Configuration(UTIL.getConfiguration());
+ // setup our custom controller
+ conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+ StaticRpcControllerFactory.class.getName());
+
+ TableName name = TableName.valueOf("testcustomcontroller");
+ UTIL.createTable(name, fam1).close();
+
+ // change one of the connection properties so we get a new Connection with our configuration
+ conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
+
+ Connection connection = ConnectionFactory.createConnection(conf);
+ Table table = connection.getTable(name);
+ byte[] row = Bytes.toBytes("row");
+ Put p = new Put(row);
+ p.addColumn(fam1, fam1, Bytes.toBytes("val0"));
+ table.put(p);
+
+ Integer counter = 1;
+ counter = verifyCount(counter);
+
+ Delete d = new Delete(row);
+ d.addColumn(fam1, fam1);
+ table.delete(d);
+ counter = verifyCount(counter);
+
+ Put p2 = new Put(row);
+ p2.addColumn(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
+ table.batch(Lists.newArrayList(p, p2), null);
+ // this only goes to a single server, so we don't need to change the count here
+ counter = verifyCount(counter);
+
+ Append append = new Append(row);
+ append.add(fam1, fam1, Bytes.toBytes("val2"));
+ table.append(append);
+ counter = verifyCount(counter);
+
+ // and check the major lookup calls as well
+ Get g = new Get(row);
+ table.get(g);
+ counter = verifyCount(counter);
+
+ ResultScanner scan = table.getScanner(fam1);
+ scan.next();
+ scan.close();
+ counter = verifyCount(counter + 2);
+
+ Get g2 = new Get(row);
+ table.get(Lists.newArrayList(g, g2));
+ // same server, so same as above for not changing count
+ counter = verifyCount(counter);
+
+ // make sure all the scanner types are covered
+ Scan scanInfo = new Scan(row);
+ // regular small
+ scanInfo.setSmall(true);
+ counter = doScan(table, scanInfo, counter);
+
+ // reversed, small
+ scanInfo.setReversed(true);
+ counter = doScan(table, scanInfo, counter);
+
+ // reversed, regular
+ scanInfo.setSmall(false);
+ counter = doScan(table, scanInfo, counter + 2);
+
+ table.close();
+ connection.close();
+ }
+
+ int doScan(Table table, Scan scan, int expectedCount) throws IOException {
+ ResultScanner results = table.getScanner(scan);
+ results.next();
+ results.close();
+ return verifyCount(expectedCount);
+ }
+
+ int verifyCount(Integer counter) {
+ assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter.intValue());
+ assertEquals(0, CountingRpcController.INT_PRIORITY.get());
+ return CountingRpcController.TABLE_PRIORITY.get() + 1;
+ }
+
+ @Test
+ public void testFallbackToDefaultRpcControllerFactory() {
+ Configuration conf = new Configuration(UTIL.getConfiguration());
+ conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, "foo.bar.Baz");
+
+ // Should not fail
+ RpcControllerFactory factory = RpcControllerFactory.instantiate(conf);
+ assertNotNull(factory);
+ assertEquals(factory.getClass(), RpcControllerFactory.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
new file mode 100644
index 0000000..a9d10e8
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+
+/**
+ * The aggregation implementation at a region.
+ */
+public class ColumnAggregationEndpoint extends ColumnAggregationService
+implements Coprocessor, CoprocessorService {
+ private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpoint.class);
+ private RegionCoprocessorEnvironment env = null;
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ if (env instanceof RegionCoprocessorEnvironment) {
+ this.env = (RegionCoprocessorEnvironment)env;
+ return;
+ }
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // Nothing to do.
+ }
+
+ @Override
+ public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
+ // aggregate at each region
+ Scan scan = new Scan();
+ // Family is required in pb. Qualifier is not.
+ byte [] family = request.getFamily().toByteArray();
+ byte [] qualifier = request.hasQualifier()? request.getQualifier().toByteArray(): null;
+ if (request.hasQualifier()) {
+ scan.addColumn(family, qualifier);
+ } else {
+ scan.addFamily(family);
+ }
+ int sumResult = 0;
+ InternalScanner scanner = null;
+ try {
+ scanner = this.env.getRegion().getScanner(scan);
+ List<Cell> curVals = new ArrayList<Cell>();
+ boolean hasMore = false;
+ do {
+ curVals.clear();
+ hasMore = scanner.next(curVals);
+ for (Cell kv : curVals) {
+ if (CellUtil.matchingQualifier(kv, qualifier)) {
+ sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
+ }
+ }
+ } while (hasMore);
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ // Set result to -1 to indicate error.
+ sumResult = -1;
+ LOG.info("Setting sum result to -1 to indicate error", e);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ sumResult = -1;
+ LOG.info("Setting sum result to -1 to indicate error", e);
+ }
+ }
+ }
+ LOG.info("Returning result " + sumResult);
+ done.run(SumResponse.newBuilder().setSum(sumResult).build());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
new file mode 100644
index 0000000..22dac6d
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+/**
+ * Test coprocessor endpoint that always returns {@code null} for requests to the last region
+ * in the table. This allows tests to provide assurance of correct {@code null} handling for
+ * response values.
+ */
+public class ColumnAggregationEndpointNullResponse
+ extends
+ ColumnAggregationServiceNullResponse
+implements Coprocessor, CoprocessorService {
+ private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointNullResponse.class);
+ private RegionCoprocessorEnvironment env = null;
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ if (env instanceof RegionCoprocessorEnvironment) {
+ this.env = (RegionCoprocessorEnvironment)env;
+ return;
+ }
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // Nothing to do.
+ }
+
+ @Override
+ public void sum(RpcController controller, ColumnAggregationNullResponseSumRequest request,
+ RpcCallback<ColumnAggregationNullResponseSumResponse> done) {
+ // aggregate at each region
+ Scan scan = new Scan();
+ // Family is required in pb. Qualifier is not.
+ byte[] family = request.getFamily().toByteArray();
+ byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
+ if (request.hasQualifier()) {
+ scan.addColumn(family, qualifier);
+ } else {
+ scan.addFamily(family);
+ }
+ int sumResult = 0;
+ InternalScanner scanner = null;
+ try {
+ Region region = this.env.getRegion();
+ // for the last region in the table, return null to test null handling
+ if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
+ done.run(null);
+ return;
+ }
+ scanner = region.getScanner(scan);
+ List<Cell> curVals = new ArrayList<Cell>();
+ boolean hasMore = false;
+ do {
+ curVals.clear();
+ hasMore = scanner.next(curVals);
+ for (Cell kv : curVals) {
+ if (CellUtil.matchingQualifier(kv, qualifier)) {
+ sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
+ }
+ }
+ } while (hasMore);
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ // Set result to -1 to indicate error.
+ sumResult = -1;
+ LOG.info("Setting sum result to -1 to indicate error", e);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ sumResult = -1;
+ LOG.info("Setting sum result to -1 to indicate error", e);
+ }
+ }
+ }
+ done.run(ColumnAggregationNullResponseSumResponse.newBuilder().setSum(sumResult)
+ .build());
+ LOG.info("Returning sum " + sumResult + " for region " +
+ Bytes.toStringBinary(env.getRegion().getRegionInfo().getRegionName()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
new file mode 100644
index 0000000..c75fb31
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+/**
+ * Test coprocessor endpoint that always throws a {@link DoNotRetryIOException} for requests on
+ * the last region in the table. This allows tests to ensure correct error handling of
+ * coprocessor endpoints throwing exceptions.
+ */
+public class ColumnAggregationEndpointWithErrors
+ extends
+ ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
+implements Coprocessor, CoprocessorService {
+ private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointWithErrors.class);
+ private RegionCoprocessorEnvironment env = null;
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ if (env instanceof RegionCoprocessorEnvironment) {
+ this.env = (RegionCoprocessorEnvironment)env;
+ return;
+ }
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // Nothing to do.
+ }
+
+ @Override
+ public void sum(RpcController controller, ColumnAggregationWithErrorsSumRequest request,
+ RpcCallback<ColumnAggregationWithErrorsSumResponse> done) {
+ // aggregate at each region
+ Scan scan = new Scan();
+ // Family is required in pb. Qualifier is not.
+ byte[] family = request.getFamily().toByteArray();
+ byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
+ if (request.hasQualifier()) {
+ scan.addColumn(family, qualifier);
+ } else {
+ scan.addFamily(family);
+ }
+ int sumResult = 0;
+ InternalScanner scanner = null;
+ try {
+ Region region = this.env.getRegion();
+ // throw an exception for requests to the last region in the table, to test error handling
+ if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
+ throw new DoNotRetryIOException("An expected exception");
+ }
+ scanner = region.getScanner(scan);
+ List<Cell> curVals = new ArrayList<Cell>();
+ boolean hasMore = false;
+ do {
+ curVals.clear();
+ hasMore = scanner.next(curVals);
+ for (Cell kv : curVals) {
+ if (CellUtil.matchingQualifier(kv, qualifier)) {
+ sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
+ }
+ }
+ } while (hasMore);
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ // Set result to -1 to indicate error.
+ sumResult = -1;
+ LOG.info("Setting sum result to -1 to indicate error", e);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ sumResult = -1;
+ LOG.info("Setting sum result to -1 to indicate error", e);
+ }
+ }
+ }
+ done.run(ColumnAggregationWithErrorsSumResponse.newBuilder().setSum(sumResult).build());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
new file mode 100644
index 0000000..5b7c1e9
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.util.Threads;
+
+import java.io.IOException;
+
+/**
+ * Test implementation of a coprocessor endpoint exposing the
+ * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by unit tests
+ * only.
+ */
+public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto
+ implements CoprocessorService, Coprocessor {
+ public ProtobufCoprocessorService() {
+ }
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void ping(RpcController controller, TestProtos.EmptyRequestProto request,
+ RpcCallback<TestProtos.EmptyResponseProto> done) {
+ done.run(TestProtos.EmptyResponseProto.getDefaultInstance());
+ }
+
+ @Override
+ public void echo(RpcController controller, TestProtos.EchoRequestProto request,
+ RpcCallback<TestProtos.EchoResponseProto> done) {
+ String message = request.getMessage();
+ done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build());
+ }
+
+ @Override
+ public void error(RpcController controller, TestProtos.EmptyRequestProto request,
+ RpcCallback<TestProtos.EmptyResponseProto> done) {
+ CoprocessorRpcUtils.setControllerException(controller, new IOException("Test exception"));
+ done.run(null);
+ }
+
+ @Override
+ public void pause(RpcController controller, PauseRequestProto request,
+ RpcCallback<EmptyResponseProto> done) {
+ Threads.sleepWithoutInterrupt(request.getMs());
+ done.run(EmptyResponseProto.getDefaultInstance());
+ }
+
+ @Override
+ public void addr(RpcController controller, EmptyRequestProto request,
+ RpcCallback<AddrResponseProto> done) {
+ done.run(AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress())
+ .build());
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ // To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // To change body of implemented methods use File | Settings | File Templates.
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java
new file mode 100644
index 0000000..c023437
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
+/**
+ * TestEndpoint: test cases to verify the batch execution of coprocessor Endpoint
+ */
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestBatchCoprocessorEndpoint {
+ private static final Log LOG = LogFactory.getLog(TestBatchCoprocessorEndpoint.class);
+
+ private static final TableName TEST_TABLE =
+ TableName.valueOf("TestTable");
+ private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
+ private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
+ private static byte[] ROW = Bytes.toBytes("testRow");
+
+ private static final int ROWSIZE = 20;
+ private static final int rowSeperator1 = 5;
+ private static final int rowSeperator2 = 12;
+ private static byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+ private static HBaseTestingUtility util = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ // set configure to indicate which cp should be loaded
+ Configuration conf = util.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
+ ProtobufCoprocessorService.class.getName(),
+ ColumnAggregationEndpointWithErrors.class.getName(),
+ ColumnAggregationEndpointNullResponse.class.getName());
+ conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ ProtobufCoprocessorService.class.getName());
+ util.startMiniCluster(2);
+ Admin admin = util.getHBaseAdmin();
+ HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
+ desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+ admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
+ util.waitUntilAllRegionsAssigned(TEST_TABLE);
+ admin.close();
+
+ Table table = util.getConnection().getTable(TEST_TABLE);
+ for (int i = 0; i < ROWSIZE; i++) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
+ table.put(put);
+ }
+ table.close();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testAggregationNullResponse() throws Throwable {
+ Table table = util.getConnection().getTable(TEST_TABLE);
+ ColumnAggregationNullResponseSumRequest.Builder builder =
+ ColumnAggregationNullResponseSumRequest
+ .newBuilder();
+ builder.setFamily(ByteString.copyFrom(TEST_FAMILY));
+ if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
+ builder.setQualifier(ByteString.copyFrom(TEST_QUALIFIER));
+ }
+ Map<byte[], ColumnAggregationNullResponseSumResponse> results =
+ table.batchCoprocessorService(
+ ColumnAggregationServiceNullResponse.getDescriptor().findMethodByName("sum"),
+ builder.build(), ROWS[0], ROWS[ROWS.length - 1],
+ ColumnAggregationNullResponseSumResponse.getDefaultInstance());
+
+ int sumResult = 0;
+ int expectedResult = 0;
+ for (Map.Entry<byte[], ColumnAggregationNullResponseSumResponse> e :
+ results.entrySet()) {
+ LOG.info("Got value " + e.getValue().getSum() + " for region "
+ + Bytes.toStringBinary(e.getKey()));
+ sumResult += e.getValue().getSum();
+ }
+ for (int i = 0; i < rowSeperator2; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+ table.close();
+ }
+
+ private static byte[][] makeN(byte[] base, int n) {
+ byte[][] ret = new byte[n][];
+ for (int i = 0; i < n; i++) {
+ ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
+ }
+ return ret;
+ }
+
+ private Map<byte[], SumResponse> sum(final Table table, final byte[] family,
+ final byte[] qualifier, final byte[] start, final byte[] end) throws ServiceException,
+ Throwable {
+ ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest
+ .newBuilder();
+ builder.setFamily(ByteString.copyFrom(family));
+ if (qualifier != null && qualifier.length > 0) {
+ builder.setQualifier(ByteString.copyFrom(qualifier));
+ }
+ return table.batchCoprocessorService(
+ ColumnAggregationProtos.ColumnAggregationService.getDescriptor().findMethodByName("sum"),
+ builder.build(), start, end, ColumnAggregationProtos.SumResponse.getDefaultInstance());
+ }
+
+ @Test
+ public void testAggregationWithReturnValue() throws Throwable {
+ Table table = util.getConnection().getTable(TEST_TABLE);
+ Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
+ ROWS[ROWS.length - 1]);
+ int sumResult = 0;
+ int expectedResult = 0;
+ for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
+ LOG.info("Got value " + e.getValue().getSum() + " for region "
+ + Bytes.toStringBinary(e.getKey()));
+ sumResult += e.getValue().getSum();
+ }
+ for (int i = 0; i < ROWSIZE; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+
+ results.clear();
+
+ // scan: for region 2 and region 3
+ results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1],
+ ROWS[ROWS.length - 1]);
+ sumResult = 0;
+ expectedResult = 0;
+ for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
+ LOG.info("Got value " + e.getValue().getSum() + " for region "
+ + Bytes.toStringBinary(e.getKey()));
+ sumResult += e.getValue().getSum();
+ }
+ for (int i = rowSeperator1; i < ROWSIZE; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+ table.close();
+ }
+
+ @Test
+ public void testAggregation() throws Throwable {
+ Table table = util.getConnection().getTable(TEST_TABLE);
+ Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
+ ROWS[0], ROWS[ROWS.length - 1]);
+ int sumResult = 0;
+ int expectedResult = 0;
+ for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
+ LOG.info("Got value " + e.getValue().getSum() + " for region "
+ + Bytes.toStringBinary(e.getKey()));
+ sumResult += e.getValue().getSum();
+ }
+ for (int i = 0; i < ROWSIZE; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+
+ // scan: for region 2 and region 3
+ results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length - 1]);
+ sumResult = 0;
+ expectedResult = 0;
+ for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
+ LOG.info("Got value " + e.getValue().getSum() + " for region "
+ + Bytes.toStringBinary(e.getKey()));
+ sumResult += e.getValue().getSum();
+ }
+ for (int i = rowSeperator1; i < ROWSIZE; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+ table.close();
+ }
+
+ @Test
+ public void testAggregationWithErrors() throws Throwable {
+ Table table = util.getConnection().getTable(TEST_TABLE);
+ final Map<byte[], ColumnAggregationWithErrorsSumResponse> results =
+ Collections.synchronizedMap(
+ new TreeMap<byte[], ColumnAggregationWithErrorsSumResponse>(
+ Bytes.BYTES_COMPARATOR
+ ));
+ ColumnAggregationWithErrorsSumRequest.Builder builder =
+ ColumnAggregationWithErrorsSumRequest
+ .newBuilder();
+ builder.setFamily(ByteString.copyFrom(TEST_FAMILY));
+ if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
+ builder.setQualifier(ByteString.copyFrom(TEST_QUALIFIER));
+ }
+
+ boolean hasError = false;
+ try {
+ table.batchCoprocessorService(
+ ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors.getDescriptor()
+ .findMethodByName("sum"),
+ builder.build(), ROWS[0], ROWS[ROWS.length - 1],
+ ColumnAggregationWithErrorsSumResponse.getDefaultInstance(),
+ new Batch.Callback<ColumnAggregationWithErrorsSumResponse>() {
+
+ @Override
+ public void update(byte[] region, byte[] row,
+ ColumnAggregationWithErrorsSumResponse result) {
+ results.put(region, result);
+ }
+ });
+ } catch (Throwable t) {
+ LOG.info("Exceptions in coprocessor service", t);
+ hasError = true;
+ }
+
+ int sumResult = 0;
+ int expectedResult = 0;
+ for (Map.Entry<byte[], ColumnAggregationWithErrorsSumResponse> e : results.entrySet()) {
+ LOG.info("Got value " + e.getValue().getSum() + " for region "
+ + Bytes.toStringBinary(e.getKey()));
+ sumResult += e.getValue().getSum();
+ }
+ for (int i = 0; i < rowSeperator2; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+ assertTrue(hasError);
+ table.close();
+ }
+}