You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/12 02:11:59 UTC
[hbase] branch master updated: HBASE-21868 Remove legacy bulk load
support
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 78d3d56 HBASE-21868 Remove legacy bulk load support
78d3d56 is described below
commit 78d3d5628a6c69b1ea22a2c234c8eb558da5d45b
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Feb 11 16:05:24 2019 +0800
HBASE-21868 Remove legacy bulk load support
Signed-off-by: Michael Stack <st...@apache.org>
---
.../security/access/SecureBulkLoadEndpoint.java | 189 ---------------------
.../src/main/protobuf/SecureBulkLoad.proto | 48 ------
.../regionserver/SecureBulkLoadEndpointClient.java | 163 ------------------
...HRegionServerBulkLoadWithOldSecureEndpoint.java | 187 --------------------
...estReplicationSyncUpToolWithBulkLoadedData.java | 7 -
.../hadoop/hbase/regionserver/RSRpcServices.java | 25 +--
.../hadoop/hbase/tool/LoadIncrementalHFiles.java | 11 --
.../TestHRegionServerBulkLoadWithOldClient.java | 177 -------------------
8 files changed, 3 insertions(+), 804 deletions(-)
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
deleted file mode 100644
index fb161d9..0000000
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.security.access;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Coprocessor service for bulk loads in secure mode.
- * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
- */
-@CoreCoprocessor
-@InterfaceAudience.Private
-@Deprecated
-public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements RegionCoprocessor {
- public static final long VERSION = 0L;
-
- private static final Logger LOG = LoggerFactory.getLogger(SecureBulkLoadEndpoint.class);
-
- private RegionCoprocessorEnvironment env;
- private RegionServerServices rsServices;
-
- @Override
- public void start(CoprocessorEnvironment env) {
- this.env = (RegionCoprocessorEnvironment)env;
- rsServices = ((HasRegionServerServices)this.env).getRegionServerServices();
- LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
- LOG.warn("Secure bulk load has been integrated into HBase core.");
- }
-
- @Override
- public void stop(CoprocessorEnvironment env) throws IOException {
- }
-
- @Override
- public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request,
- RpcCallback<PrepareBulkLoadResponse> done) {
- try {
- SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager();
-
- String bulkToken = secureBulkLoadManager.prepareBulkLoad((HRegion) this.env.getRegion(),
- convert(request));
- done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
- } catch (IOException e) {
- CoprocessorRpcUtils.setControllerException(controller, e);
- }
- done.run(null);
- }
-
- /**
- * Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
- */
- org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest
- convert(PrepareBulkLoadRequest request)
- throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
- byte [] bytes = request.toByteArray();
- org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.Builder
- builder =
- org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.
- newBuilder();
- builder.mergeFrom(bytes);
- return builder.build();
- }
-
- @Override
- public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request,
- RpcCallback<CleanupBulkLoadResponse> done) {
- try {
- SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager();
- secureBulkLoadManager.cleanupBulkLoad((HRegion) this.env.getRegion(), convert(request));
- done.run(CleanupBulkLoadResponse.newBuilder().build());
- } catch (IOException e) {
- CoprocessorRpcUtils.setControllerException(controller, e);
- }
- done.run(null);
- }
-
- /**
- * Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
- */
- org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest
- convert(CleanupBulkLoadRequest request)
- throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
- byte [] bytes = request.toByteArray();
- org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.Builder
- builder =
- org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.
- newBuilder();
- builder.mergeFrom(bytes);
- return builder.build();
- }
-
- @Override
- public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request,
- RpcCallback<SecureBulkLoadHFilesResponse> done) {
- boolean loaded = false;
- Map<byte[], List<Path>> map = null;
- try {
- SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager();
- BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request);
- map = secureBulkLoadManager.secureBulkLoadHFiles((HRegion) this.env.getRegion(),
- convert(bulkLoadHFileRequest));
- loaded = map != null && !map.isEmpty();
- } catch (IOException e) {
- CoprocessorRpcUtils.setControllerException(controller, e);
- }
- done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
- }
-
- /**
- * Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
- */
- org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest
- convert(BulkLoadHFileRequest request)
- throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
- byte [] bytes = request.toByteArray();
- org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder
- builder =
- org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.
- newBuilder();
- builder.mergeFrom(bytes);
- return builder.build();
- }
-
- private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest(
- SecureBulkLoadHFilesRequest request) {
- BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder();
- RegionSpecifier region =
- ProtobufUtil.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env
- .getRegionInfo().getRegionName());
- bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken())
- .setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum())
- .addAllFamilyPath(request.getFamilyPathList());
- return bulkLoadHFileRequest.build();
- }
-
- @Override
- public Iterable<Service> getServices() {
- return Collections.singleton(this);
- }
-}
diff --git a/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto b/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto
deleted file mode 100644
index d86d162..0000000
--- a/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package hbase.pb;
-
-option java_package = "org.apache.hadoop.hbase.protobuf.generated";
-option java_outer_classname = "SecureBulkLoadProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-option optimize_for = SPEED;
-
-import 'Client.proto';
-
-message SecureBulkLoadHFilesRequest {
- repeated BulkLoadHFileRequest.FamilyPath family_path = 1;
- optional bool assign_seq_num = 2;
- required DelegationToken fs_token = 3;
- required string bulk_token = 4;
-}
-
-message SecureBulkLoadHFilesResponse {
- required bool loaded = 1;
-}
-
-service SecureBulkLoadService {
- rpc PrepareBulkLoad(PrepareBulkLoadRequest)
- returns (PrepareBulkLoadResponse);
-
- rpc SecureBulkLoadHFiles(SecureBulkLoadHFilesRequest)
- returns (SecureBulkLoadHFilesResponse);
-
- rpc CleanupBulkLoad(CleanupBulkLoadRequest)
- returns (CleanupBulkLoadResponse);
-}
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
deleted file mode 100644
index 0d15f93..0000000
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
-import org.apache.hadoop.hbase.util.ByteStringer;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.security.token.Token;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint
- * @deprecated Use for backward compatibility testing only. Will be removed when
- * SecureBulkLoadEndpoint is not supported.
- */
-@Deprecated
-@InterfaceAudience.Private
-public class SecureBulkLoadEndpointClient {
- private Table table;
-
- public SecureBulkLoadEndpointClient(Table table) {
- this.table = table;
- }
-
- public String prepareBulkLoad(final TableName tableName) throws IOException {
- try {
- CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
- SecureBulkLoadProtos.SecureBulkLoadService instance =
- ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
-
- ServerRpcController controller = new ServerRpcController();
-
- CoprocessorRpcUtils.BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback =
- new CoprocessorRpcUtils.BlockingRpcCallback<>();
-
- PrepareBulkLoadRequest request =
- PrepareBulkLoadRequest.newBuilder()
- .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
-
- instance.prepareBulkLoad(controller, request, rpcCallback);
-
- PrepareBulkLoadResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
- }
-
- return response.getBulkToken();
- } catch (Throwable throwable) {
- throw new IOException(throwable);
- }
- }
-
- public void cleanupBulkLoad(final String bulkToken) throws IOException {
- try {
- CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
- SecureBulkLoadProtos.SecureBulkLoadService instance =
- ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
-
- ServerRpcController controller = new ServerRpcController();
-
- CoprocessorRpcUtils.BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback =
- new CoprocessorRpcUtils.BlockingRpcCallback<>();
-
- CleanupBulkLoadRequest request =
- CleanupBulkLoadRequest.newBuilder()
- .setBulkToken(bulkToken).build();
-
- instance.cleanupBulkLoad(controller,
- request,
- rpcCallback);
-
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
- }
- } catch (Throwable throwable) {
- throw new IOException(throwable);
- }
- }
-
- public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
- final Token<?> userToken, final String bulkToken, final byte[] startRow)
- throws IOException {
- // we never want to send a batch of HFiles to all regions, thus cannot call
- // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
- try {
- CoprocessorRpcChannel channel = table.coprocessorService(startRow);
- SecureBulkLoadProtos.SecureBulkLoadService instance =
- ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
-
- DelegationToken protoDT =
- DelegationToken.newBuilder().build();
- if(userToken != null) {
- protoDT =
- DelegationToken.newBuilder()
- .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
- .setPassword(ByteStringer.wrap(userToken.getPassword()))
- .setKind(userToken.getKind().toString())
- .setService(userToken.getService().toString()).build();
- }
-
- List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
- new ArrayList<>(familyPaths.size());
- for(Pair<byte[], String> el: familyPaths) {
- protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
- .setFamily(ByteStringer.wrap(el.getFirst()))
- .setPath(el.getSecond()).build());
- }
-
- SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
- SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
- .setFsToken(protoDT)
- .addAllFamilyPath(protoFamilyPaths)
- .setBulkToken(bulkToken).build();
-
- ServerRpcController controller = new ServerRpcController();
- CoprocessorRpcUtils.BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>
- rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
- instance.secureBulkLoadHFiles(controller,
- request,
- rpcCallback);
-
- SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
- }
- return response.getLoaded();
- } catch (Throwable throwable) {
- throw new IOException(throwable);
- }
- }
-}
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
deleted file mode 100644
index 49697b8..0000000
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
-import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.RpcRetryingCaller;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-
-/**
- * Tests bulk loading of HFiles with old secure Endpoint client for backward compatibility. Will be
- * removed when old non-secure client for backward compatibility is not supported.
- */
-@RunWith(Parameterized.class)
-@Category({RegionServerTests.class, LargeTests.class})
-@Ignore // BROKEN. FIX OR REMOVE.
-public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad {
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
-
- public TestHRegionServerBulkLoadWithOldSecureEndpoint(int duration) {
- super(duration);
- }
-
- private static final Logger LOG =
- LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
-
- @BeforeClass
- public static void setUpBeforeClass() throws IOException {
- conf.setInt("hbase.rpc.timeout", 10 * 1000);
- conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
- "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- }
-
- public static class AtomicHFileLoader extends RepeatingTestThread {
- final AtomicLong numBulkLoads = new AtomicLong();
- final AtomicLong numCompactions = new AtomicLong();
- private TableName tableName;
-
- public AtomicHFileLoader(TableName tableName, TestContext ctx, byte[][] targetFamilies)
- throws IOException {
- super(ctx);
- this.tableName = tableName;
- }
-
- public void doAnAction() throws Exception {
- long iteration = numBulkLoads.getAndIncrement();
- Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
- iteration));
-
- // create HFiles for different column families
- FileSystem fs = UTIL.getTestFileSystem();
- byte[] val = Bytes.toBytes(String.format("%010d", iteration));
- final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
- for (int i = 0; i < NUM_CFS; i++) {
- Path hfile = new Path(dir, family(i));
- byte[] fam = Bytes.toBytes(family(i));
- createHFile(fs, hfile, fam, QUAL, val, 1000);
- famPaths.add(new Pair<>(fam, hfile.toString()));
- }
-
- // bulk load HFiles
- final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
- Table table = conn.getTable(tableName);
- final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
- RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
- ClientServiceCallable<Void> callable =
- new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
- rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
- @Override
- protected Void rpcCall() throws Exception {
- LOG.debug("Going to connect to server " + getLocation() + " for row " +
- Bytes.toStringBinary(getRow()));
- try (Table table = conn.getTable(getTableName())) {
- boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
- null, bulkToken, getLocation().getRegionInfo().getStartKey());
- }
- return null;
- }
- };
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
- RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
- caller.callWithRetries(callable, Integer.MAX_VALUE);
-
- // Periodically do compaction to reduce the number of open file handles.
- if (numBulkLoads.get() % 5 == 0) {
- // 5 * 50 = 250 open file handles!
- callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
- rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
- @Override
- protected Void rpcCall() throws Exception {
- LOG.debug("compacting " + getLocation() + " for row "
- + Bytes.toStringBinary(getRow()));
- AdminProtos.AdminService.BlockingInterface server =
- conn.getAdmin(getLocation().getServerName());
- CompactRegionRequest request =
- RequestConverter.buildCompactRegionRequest(
- getLocation().getRegionInfo().getRegionName(), true, null);
- server.compactRegion(null, request);
- numCompactions.incrementAndGet();
- return null;
- }
- };
- caller.callWithRetries(callable, Integer.MAX_VALUE);
- }
- }
- }
-
- void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
- throws Exception {
- setupTable(tableName, 10);
-
- TestContext ctx = new TestContext(UTIL.getConfiguration());
-
- AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
- ctx.addThread(loader);
-
- List<AtomicScanReader> scanners = Lists.newArrayList();
- for (int i = 0; i < numScanners; i++) {
- AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
- scanners.add(scanner);
- ctx.addThread(scanner);
- }
-
- ctx.startThreads();
- ctx.waitFor(millisToRun);
- ctx.stop();
-
- LOG.info("Loaders:");
- LOG.info(" loaded " + loader.numBulkLoads.get());
- LOG.info(" compations " + loader.numCompactions.get());
-
- LOG.info("Scanners:");
- for (AtomicScanReader scanner : scanners) {
- LOG.info(" scanned " + scanner.numScans.get());
- LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
- }
- }
-}
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index 96010d9..eb575c5 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -61,12 +60,6 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
conf1.set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
- String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
- if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
- classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
- conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
- }
-
TestReplicationBase.setUpBeforeClass();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 592f99c..9b99ff8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2382,7 +2382,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
requestCount.increment();
HRegion region = getRegion(request.getRegion());
- Map<byte[], List<Path>> map = null;
final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
long sizeToBeLoaded = -1;
@@ -2401,27 +2400,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
}
}
-
- List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
- for (FamilyPath familyPath : request.getFamilyPathList()) {
- familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
- }
- if (!request.hasBulkToken()) {
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
- }
- try {
- map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
- request.getCopyFile());
- } finally {
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
- }
- }
- } else {
- // secure bulk load
- map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
- }
+ // secure bulk load
+ Map<byte[], List<Path>> map =
+ regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
builder.setLoaded(map != null);
if (map != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index 314f2cb..82f5da4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
@@ -394,11 +393,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
int count = 0;
- if (isSecureBulkLoadEndpointAvailable()) {
- LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
- LOG.warn("Secure bulk load has been integrated into HBase core.");
- }
-
fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
@@ -1055,11 +1049,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
return sb.toString();
}
- private boolean isSecureBulkLoadEndpointAvailable() {
- String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
- return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- }
-
/**
* Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
* filters, etc.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
deleted file mode 100644
index de01401..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
-import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.RpcRetryingCaller;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.junit.ClassRule;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-
-/**
- * Tests bulk loading of HFiles with old non-secure client for backward compatibility. Will be
- * removed when old non-secure client for backward compatibility is not supported.
- */
-@RunWith(Parameterized.class)
-@Category({RegionServerTests.class, LargeTests.class})
-public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBulkLoad {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldClient.class);
-
- private static final Logger LOG =
- LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldClient.class);
-
- public TestHRegionServerBulkLoadWithOldClient(int duration) {
- super(duration);
- }
-
- public static class AtomicHFileLoader extends RepeatingTestThread {
- final AtomicLong numBulkLoads = new AtomicLong();
- final AtomicLong numCompactions = new AtomicLong();
- private TableName tableName;
-
- public AtomicHFileLoader(TableName tableName, TestContext ctx,
- byte targetFamilies[][]) throws IOException {
- super(ctx);
- this.tableName = tableName;
- }
-
- @Override
- public void doAnAction() throws Exception {
- long iteration = numBulkLoads.getAndIncrement();
- Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
- iteration));
-
- // create HFiles for different column families
- FileSystem fs = UTIL.getTestFileSystem();
- byte[] val = Bytes.toBytes(String.format("%010d", iteration));
- final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
- for (int i = 0; i < NUM_CFS; i++) {
- Path hfile = new Path(dir, family(i));
- byte[] fam = Bytes.toBytes(family(i));
- createHFile(fs, hfile, fam, QUAL, val, 1000);
- famPaths.add(new Pair<>(fam, hfile.toString()));
- }
-
- // bulk load HFiles
- final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
- RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
- ClientServiceCallable<Void> callable =
- new ClientServiceCallable<Void>(conn, tableName,
- Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
- @Override
- protected Void rpcCall() throws Exception {
- LOG.info("Non-secure old client");
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- BulkLoadHFileRequest request =
- RequestConverter
- .buildBulkLoadHFileRequest(famPaths, regionName, true, null, null);
- getStub().bulkLoadHFile(null, request);
- return null;
- }
- };
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
- RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
- caller.callWithRetries(callable, Integer.MAX_VALUE);
-
- // Periodically do compaction to reduce the number of open file handles.
- if (numBulkLoads.get() % 5 == 0) {
- // 5 * 50 = 250 open file handles!
- callable = new ClientServiceCallable<Void>(conn, tableName,
- Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
- @Override
- protected Void rpcCall() throws Exception {
- LOG.debug("compacting " + getLocation() + " for row "
- + Bytes.toStringBinary(getRow()));
- AdminProtos.AdminService.BlockingInterface server =
- conn.getAdmin(getLocation().getServerName());
- CompactRegionRequest request =
- RequestConverter.buildCompactRegionRequest(
- getLocation().getRegionInfo().getRegionName(), true, null);
- server.compactRegion(null, request);
- numCompactions.incrementAndGet();
- return null;
- }
- };
- caller.callWithRetries(callable, Integer.MAX_VALUE);
- }
- }
- }
-
- @Override
- void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
- throws Exception {
- setupTable(tableName, 10);
-
- TestContext ctx = new TestContext(UTIL.getConfiguration());
-
- AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
- ctx.addThread(loader);
-
- List<AtomicScanReader> scanners = Lists.newArrayList();
- for (int i = 0; i < numScanners; i++) {
- AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
- scanners.add(scanner);
- ctx.addThread(scanner);
- }
-
- ctx.startThreads();
- ctx.waitFor(millisToRun);
- ctx.stop();
-
- LOG.info("Loaders:");
- LOG.info(" loaded " + loader.numBulkLoads.get());
- LOG.info(" compations " + loader.numCompactions.get());
-
- LOG.info("Scanners:");
- for (AtomicScanReader scanner : scanners) {
- LOG.info(" scanned " + scanner.numScans.get());
- LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
- }
- }
-}