You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/09/03 12:04:42 UTC
[1/4] hbase git commit: HBASE-18699 Copy LoadIncrementalHFiles to
another package and mark the old one as deprecated
Repository: hbase
Updated Branches:
refs/heads/master 7c51d3f2e -> 9e53f2927
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 0fe79d1..199c2c1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -60,13 +60,13 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java
new file mode 100644
index 0000000..3ebda29
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.security.UserProvider;
+
+/**
+ * A {@link UserProvider} that always says hadoop security is enabled, regardless of the underlying
+ * configuration. HBase security is <i>not enabled</i> as this is used to determine if SASL is used
+ * to do the authentication, which requires a Kerberos ticket (which we currently don't have in
+ * tests).
+ * <p>
+ * This should only be used for <b>TESTING</b>.
+ */
+public class HadoopSecurityEnabledUserProviderForTesting extends UserProvider {
+
+ @Override
+ public boolean isHBaseSecurityEnabled() {
+ return false;
+ }
+
+ @Override
+ public boolean isHadoopSecurityEnabled() {
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 6583366..1e38179 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -25,6 +25,12 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -93,7 +99,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
@@ -118,11 +123,9 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.Permission.Action;
-import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.log4j.Level;
@@ -134,11 +137,9 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
/**
* Performs authorization checks for common operations, according to different
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java
new file mode 100644
index 0000000..3f7d441
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This class provides shims for HBase to interact with the Hadoop 1.0.x and the
+ * Hadoop 0.23.x series.
+ *
+ * NOTE: No testing done against 0.22.x, or 0.21.x.
+ */
+abstract public class MapreduceTestingShim {
+ private static MapreduceTestingShim instance;
+ private static Class[] emptyParam = new Class[] {};
+
+ static {
+ try {
+ // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x
+ Class c = Class
+ .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+ instance = new MapreduceV2Shim();
+ } catch (Exception e) {
+ instance = new MapreduceV1Shim();
+ }
+ }
+
+ abstract public JobContext newJobContext(Configuration jobConf)
+ throws IOException;
+
+ abstract public Job newJob(Configuration conf) throws IOException;
+
+ abstract public JobConf obtainJobConf(MiniMRCluster cluster);
+
+ abstract public String obtainMROutputDirProp();
+
+ public static JobContext createJobContext(Configuration jobConf)
+ throws IOException {
+ return instance.newJobContext(jobConf);
+ }
+
+ public static JobConf getJobConf(MiniMRCluster cluster) {
+ return instance.obtainJobConf(cluster);
+ }
+
+ public static Job createJob(Configuration conf) throws IOException {
+ return instance.newJob(conf);
+ }
+
+ public static String getMROutputDirProp() {
+ return instance.obtainMROutputDirProp();
+ }
+
+ private static class MapreduceV1Shim extends MapreduceTestingShim {
+ public JobContext newJobContext(Configuration jobConf) throws IOException {
+ // Implementing:
+ // return new JobContext(jobConf, new JobID());
+ JobID jobId = new JobID();
+ Constructor<JobContext> c;
+ try {
+ c = JobContext.class.getConstructor(Configuration.class, JobID.class);
+ return c.newInstance(jobConf, jobId);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Failed to instantiate new JobContext(jobConf, new JobID())", e);
+ }
+ }
+
+ @Override
+ public Job newJob(Configuration conf) throws IOException {
+ // Implementing:
+ // return new Job(conf);
+ Constructor<Job> c;
+ try {
+ c = Job.class.getConstructor(Configuration.class);
+ return c.newInstance(conf);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Failed to instantiate new Job(conf)", e);
+ }
+ }
+
+ public JobConf obtainJobConf(MiniMRCluster cluster) {
+ if (cluster == null) return null;
+ try {
+ Object runner = cluster.getJobTrackerRunner();
+ Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
+ Object tracker = meth.invoke(runner, new Object []{});
+ Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
+ return (JobConf) m.invoke(tracker, new Object []{});
+ } catch (NoSuchMethodException nsme) {
+ return null;
+ } catch (InvocationTargetException ite) {
+ return null;
+ } catch (IllegalAccessException iae) {
+ return null;
+ }
+ }
+
+ @Override
+ public String obtainMROutputDirProp() {
+ return "mapred.output.dir";
+ }
+ };
+
+ private static class MapreduceV2Shim extends MapreduceTestingShim {
+ public JobContext newJobContext(Configuration jobConf) {
+ return newJob(jobConf);
+ }
+
+ @Override
+ public Job newJob(Configuration jobConf) {
+ // Implementing:
+ // return Job.getInstance(jobConf);
+ try {
+ Method m = Job.class.getMethod("getInstance", Configuration.class);
+ return (Job) m.invoke(null, jobConf); // static method, then arg
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IllegalStateException(
+ "Failed to return from Job.getInstance(jobConf)");
+ }
+ }
+
+ public JobConf obtainJobConf(MiniMRCluster cluster) {
+ try {
+ Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
+ return (JobConf) meth.invoke(cluster, new Object []{});
+ } catch (NoSuchMethodException nsme) {
+ return null;
+ } catch (InvocationTargetException ite) {
+ return null;
+ } catch (IllegalAccessException iae) {
+ return null;
+ }
+ }
+
+ @Override
+ public String obtainMROutputDirProp() {
+ // This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR
+ // from Hadoop 0.23.x. If we use the source directly we break the hadoop 1.x compile.
+ return "mapreduce.output.fileoutputformat.outputdir";
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
new file mode 100644
index 0000000..7e4d40e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
@@ -0,0 +1,723 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run
+ * faster than the full MR cluster tests in TestHFileOutputFormat
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestLoadIncrementalHFiles {
+ @Rule
+ public TestName tn = new TestName();
+
+ private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
+ private static final byte[] FAMILY = Bytes.toBytes("myfam");
+ private static final String NAMESPACE = "bulkNS";
+
+ static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
+ static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
+
+ private static final byte[][] SPLIT_KEYS =
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
+
+ static HBaseTestingUtility util = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+ util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+ MAX_FILES_PER_REGION_PER_FAMILY);
+ // change default behavior so that tag values are returned with normal rpcs
+ util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
+ KeyValueCodecWithTags.class.getCanonicalName());
+ util.startMiniCluster();
+
+ setupNamespace();
+ }
+
+ protected static void setupNamespace() throws Exception {
+ util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ @Test(timeout = 120000)
+ public void testSimpleLoadWithMap() throws Exception {
+ runTest("testSimpleLoadWithMap", BloomType.NONE,
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
+ true);
+ }
+
+ /**
+ * Test case that creates some regions and loads HFiles that fit snugly inside those regions
+ */
+ @Test(timeout = 120000)
+ public void testSimpleLoad() throws Exception {
+ runTest("testSimpleLoad", BloomType.NONE,
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
+ }
+
+ @Test(timeout = 120000)
+ public void testSimpleLoadWithFileCopy() throws Exception {
+ String testName = tn.getMethodName();
+ final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
+ runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
+ false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
+ false, true);
+ }
+
+ /**
+ * Test case that creates some regions and loads HFiles that cross the boundaries of those regions
+ */
+ @Test(timeout = 120000)
+ public void testRegionCrossingLoad() throws Exception {
+ runTest("testRegionCrossingLoad", BloomType.NONE,
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+ new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+ }
+
+ /**
+ * Test loading into a column family that has a ROW bloom filter.
+ */
+ @Test(timeout = 60000)
+ public void testRegionCrossingRowBloom() throws Exception {
+ runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+ new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+ }
+
+ /**
+ * Test loading into a column family that has a ROWCOL bloom filter.
+ */
+ @Test(timeout = 120000)
+ public void testRegionCrossingRowColBloom() throws Exception {
+ runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+ new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+ }
+
+ /**
+ * Test case that creates some regions and loads HFiles that have different region boundaries than
+ * the table pre-split.
+ */
+ @Test(timeout = 120000)
+ public void testSimpleHFileSplit() throws Exception {
+ runTest("testHFileSplit", BloomType.NONE,
+ new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
+ Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
+ new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
+ }
+
+ /**
+ * Test case that creates some regions and loads HFiles that cross the boundaries and have
+ * different region boundaries than the table pre-split.
+ */
+ @Test(timeout = 60000)
+ public void testRegionCrossingHFileSplit() throws Exception {
+ testRegionCrossingHFileSplit(BloomType.NONE);
+ }
+
+ /**
+ * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom
+ * filter and a different region boundaries than the table pre-split.
+ */
+ @Test(timeout = 120000)
+ public void testRegionCrossingHFileSplitRowBloom() throws Exception {
+ testRegionCrossingHFileSplit(BloomType.ROW);
+ }
+
+ /**
+ * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL
+ * bloom filter and a different region boundaries than the table pre-split.
+ */
+ @Test(timeout = 120000)
+ public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
+ testRegionCrossingHFileSplit(BloomType.ROWCOL);
+ }
+
+ @Test
+ public void testSplitALot() throws Exception {
+ runTest("testSplitALot", BloomType.NONE,
+ new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
+ Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
+ Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
+ Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
+ Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
+ Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
+ }
+
+ private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
+ runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
+ new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
+ Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+ new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+ }
+
+ private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
+ return TableDescriptorBuilder.newBuilder(tableName)
+ .addColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
+ .build();
+ }
+
+ private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
+ throws Exception {
+ runTest(testName, bloomType, null, hfileRanges);
+ }
+
+ private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap)
+ throws Exception {
+ runTest(testName, bloomType, null, hfileRanges, useMap);
+ }
+
+ private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
+ byte[][][] hfileRanges) throws Exception {
+ runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
+ }
+
+ private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
+ byte[][][] hfileRanges, boolean useMap) throws Exception {
+ final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
+ final boolean preCreateTable = tableSplitKeys != null;
+
+ // Run the test bulkloading the table to the default namespace
+ final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
+ runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
+ useMap);
+
+ // Run the test bulkloading the table to the specified namespace
+ final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
+ runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
+ useMap);
+ }
+
+ private void runTest(String testName, TableName tableName, BloomType bloomType,
+ boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
+ throws Exception {
+ TableDescriptor htd = buildHTD(tableName, bloomType);
+ runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
+ }
+
+ public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
+ byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
+ byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles,
+ int initRowCount, int factor) throws Exception {
+ Path dir = util.getDataTestDirOnTestFS(testName);
+ FileSystem fs = util.getTestFileSystem();
+ dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ Path familyDir = new Path(dir, Bytes.toString(fam));
+
+ int hfileIdx = 0;
+ Map<byte[], List<Path>> map = null;
+ List<Path> list = null;
+ if (useMap || copyFiles) {
+ list = new ArrayList<>();
+ }
+ if (useMap) {
+ map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ map.put(fam, list);
+ }
+ Path last = null;
+ for (byte[][] range : hfileRanges) {
+ byte[] from = range[0];
+ byte[] to = range[1];
+ Path path = new Path(familyDir, "hfile_" + hfileIdx++);
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
+ if (useMap) {
+ last = path;
+ list.add(path);
+ }
+ }
+ int expectedRows = hfileIdx * factor;
+
+ TableName tableName = htd.getTableName();
+ if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
+ util.getAdmin().createTable(htd, tableSplitKeys);
+ }
+
+ Configuration conf = util.getConfiguration();
+ if (copyFiles) {
+ conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
+ }
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+ String[] args = { dir.toString(), tableName.toString() };
+ if (useMap) {
+ if (deleteFile) {
+ fs.delete(last, true);
+ }
+ Map<LoadQueueItem, ByteBuffer> loaded = loader.run(map, tableName);
+ if (deleteFile) {
+ expectedRows -= 1000;
+ for (LoadQueueItem item : loaded.keySet()) {
+ if (item.getFilePath().getName().equals(last.getName())) {
+ fail(last + " should be missing");
+ }
+ }
+ }
+ } else {
+ loader.run(args);
+ }
+
+ if (copyFiles) {
+ for (Path p : list) {
+ assertTrue(p + " should exist", fs.exists(p));
+ }
+ }
+
+ Table table = util.getConnection().getTable(tableName);
+ try {
+ assertEquals(initRowCount + expectedRows, util.countRows(table));
+ } finally {
+ table.close();
+ }
+
+ return expectedRows;
+ }
+
+ private void runTest(String testName, TableDescriptor htd, BloomType bloomType,
+ boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
+ boolean copyFiles) throws Exception {
+ loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
+ useMap, true, copyFiles, 0, 1000);
+
+ final TableName tableName = htd.getTableName();
+ // verify staging folder has been cleaned up
+ Path stagingBasePath =
+ new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
+ FileSystem fs = util.getTestFileSystem();
+ if (fs.exists(stagingBasePath)) {
+ FileStatus[] files = fs.listStatus(stagingBasePath);
+ for (FileStatus file : files) {
+ assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
+ file.getPath().getName() != "DONOTERASE");
+ }
+ }
+
+ util.deleteTable(tableName);
+ }
+
+ /**
+ * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the
+ * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the
+ * responses.
+ */
+ @Test(timeout = 60000)
+ public void testTagsSurviveBulkLoadSplit() throws Exception {
+ Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
+ FileSystem fs = util.getTestFileSystem();
+ dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+ // table has these split points
+ byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
+ Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
+
+ // creating an hfile that has values that span the split points.
+ byte[] from = Bytes.toBytes("ddd");
+ byte[] to = Bytes.toBytes("ooo");
+ HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
+ new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000);
+ int expectedRows = 1000;
+
+ TableName tableName = TableName.valueOf(tn.getMethodName());
+ TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
+ util.getAdmin().createTable(htd, tableSplitKeys);
+
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+ String[] args = { dir.toString(), tableName.toString() };
+ loader.run(args);
+
+ Table table = util.getConnection().getTable(tableName);
+ try {
+ assertEquals(expectedRows, util.countRows(table));
+ HFileTestUtil.verifyTags(table);
+ } finally {
+ table.close();
+ }
+
+ util.deleteTable(tableName);
+ }
+
+ /**
+ * Test loading into a column family that does not exist.
+ */
+ @Test(timeout = 60000)
+ public void testNonexistentColumnFamilyLoad() throws Exception {
+ String testName = tn.getMethodName();
+ byte[][][] hFileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
+
+ byte[] TABLE = Bytes.toBytes("mytable_" + testName);
+ // set real family name to upper case in purpose to simulate the case that
+ // family name in HFiles is invalid
+ TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder
+ .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
+ .build();
+
+ try {
+ runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
+ assertTrue("Loading into table with non-existent family should have failed", false);
+ } catch (Exception e) {
+ assertTrue("IOException expected", e instanceof IOException);
+ // further check whether the exception message is correct
+ String errMsg = e.getMessage();
+ assertTrue(
+ "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY +
+ "], current message: [" + errMsg + "]",
+ errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
+ testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
+ }
+
+ @Test(timeout = 120000)
+ public void testNonHfileFolder() throws Exception {
+ testNonHfileFolder("testNonHfileFolder", false);
+ }
+
+ /**
+ * Write a random data file and a non-file in a dir with a valid family name but not part of the
+ * table families. we should we able to bulkload without getting the unmatched family exception.
+ * HBASE-13037/HBASE-13227
+ */
+ private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
+ Path dir = util.getDataTestDirOnTestFS(tableName);
+ FileSystem fs = util.getTestFileSystem();
+ dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+ Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY,
+ QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
+ createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
+
+ final String NON_FAMILY_FOLDER = "_logs";
+ Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
+ fs.mkdirs(nonFamilyDir);
+ fs.mkdirs(new Path(nonFamilyDir, "non-file"));
+ createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
+
+ Table table = null;
+ try {
+ if (preCreateTable) {
+ table = util.createTable(TableName.valueOf(tableName), FAMILY);
+ } else {
+ table = util.getConnection().getTable(TableName.valueOf(tableName));
+ }
+
+ final String[] args = { dir.toString(), tableName };
+ new LoadIncrementalHFiles(util.getConfiguration()).run(args);
+ assertEquals(500, util.countRows(table));
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ fs.delete(dir, true);
+ }
+ }
+
+ private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException {
+ FSDataOutputStream stream = fs.create(path);
+ try {
+ byte[] data = new byte[1024];
+ for (int i = 0; i < data.length; ++i) {
+ data[i] = (byte) (i & 0xff);
+ }
+ while (size >= data.length) {
+ stream.write(data, 0, data.length);
+ size -= data.length;
+ }
+ if (size > 0) {
+ stream.write(data, 0, size);
+ }
+ } finally {
+ stream.close();
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testSplitStoreFile() throws IOException {
+ Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
+ FileSystem fs = util.getTestFileSystem();
+ Path testIn = new Path(dir, "testhfile");
+ ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
+ Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
+
+ Path bottomOut = new Path(dir, "bottom.out");
+ Path topOut = new Path(dir, "top.out");
+
+ LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
+ Bytes.toBytes("ggg"), bottomOut, topOut);
+
+ int rowCount = verifyHFile(bottomOut);
+ rowCount += verifyHFile(topOut);
+ assertEquals(1000, rowCount);
+ }
+
+ @Test
+ public void testSplitStoreFileWithNoneToNone() throws IOException {
+ testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
+ }
+
+ @Test
+ public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
+ testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
+ }
+
+ @Test
+ public void testSplitStoreFileWithEncodedToNone() throws IOException {
+ testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
+ }
+
+ @Test
+ public void testSplitStoreFileWithNoneToEncoded() throws IOException {
+ testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
+ }
+
+ private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
+ DataBlockEncoding cfEncoding) throws IOException {
+ Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
+ FileSystem fs = util.getTestFileSystem();
+ Path testIn = new Path(dir, "testhfile");
+ ColumnFamilyDescriptor familyDesc =
+ ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
+ HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
+ bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
+
+ Path bottomOut = new Path(dir, "bottom.out");
+ Path topOut = new Path(dir, "top.out");
+
+ LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
+ Bytes.toBytes("ggg"), bottomOut, topOut);
+
+ int rowCount = verifyHFile(bottomOut);
+ rowCount += verifyHFile(topOut);
+ assertEquals(1000, rowCount);
+ }
+
+ private int verifyHFile(Path p) throws IOException {
+ Configuration conf = util.getConfiguration();
+ HFile.Reader reader =
+ HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
+ reader.loadFileInfo();
+ HFileScanner scanner = reader.getScanner(false, false);
+ scanner.seekTo();
+ int count = 0;
+ do {
+ count++;
+ } while (scanner.next());
+ assertTrue(count > 0);
+ reader.close();
+ return count;
+ }
+
+ private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
+ Integer value = map.containsKey(first) ? map.get(first) : 0;
+ map.put(first, value + 1);
+
+ value = map.containsKey(last) ? map.get(last) : 0;
+ map.put(last, value - 1);
+ }
+
+ @Test(timeout = 120000)
+ public void testInferBoundaries() {
+ TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+ /*
+ * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s
+ * u----w Should be inferred as: a-----------------k m-------------q r--------------t
+ * u---------x The output should be (m,r,u)
+ */
+
+ String first;
+ String last;
+
+ first = "a";
+ last = "e";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "r";
+ last = "s";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "o";
+ last = "p";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "g";
+ last = "k";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "v";
+ last = "x";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "c";
+ last = "i";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "m";
+ last = "q";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "s";
+ last = "t";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "u";
+ last = "w";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
+ byte[][] compare = new byte[3][];
+ compare[0] = "m".getBytes();
+ compare[1] = "r".getBytes();
+ compare[2] = "u".getBytes();
+
+ assertEquals(keysArray.length, 3);
+
+ for (int row = 0; row < keysArray.length; row++) {
+ assertArrayEquals(keysArray[row], compare[row]);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testLoadTooMayHFiles() throws Exception {
+ Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
+ FileSystem fs = util.getTestFileSystem();
+ dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+ byte[] from = Bytes.toBytes("begin");
+ byte[] to = Bytes.toBytes("end");
+ for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i),
+ FAMILY, QUALIFIER, from, to, 1000);
+ }
+
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+ String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" };
+ try {
+ loader.run(args);
+ fail("Bulk loading too many files should fail");
+ } catch (IOException ie) {
+ assertTrue(ie.getMessage()
+ .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
+ }
+ }
+
+ @Test(expected = TableNotFoundException.class)
+ public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
+ Configuration conf = util.getConfiguration();
+ conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+ String[] args = { "directory", "nonExistingTable" };
+ loader.run(args);
+ }
+
+ @Test(timeout = 120000)
+ public void testTableWithCFNameStartWithUnderScore() throws Exception {
+ Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
+ FileSystem fs = util.getTestFileSystem();
+ dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ String family = "_cf";
+ Path familyDir = new Path(dir, family);
+
+ byte[] from = Bytes.toBytes("begin");
+ byte[] to = Bytes.toBytes("end");
+ Configuration conf = util.getConfiguration();
+ String tableName = tn.getMethodName();
+ Table table = util.createTable(TableName.valueOf(tableName), family);
+ HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
+ QUALIFIER, from, to, 1000);
+
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+ String[] args = { dir.toString(), tableName };
+ try {
+ loader.run(args);
+ assertEquals(1000, util.countRows(table));
+ } finally {
+ if (null != table) {
+ table.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
new file mode 100644
index 0000000..414a6cb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
@@ -0,0 +1,628 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+/**
+ * Test cases for the atomic load error handling of the bulk load functionality.
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestLoadIncrementalHFilesSplitRecovery {
+ private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+
+ static HBaseTestingUtility util;
+ // used by secure subclass
+ static boolean useSecure = false;
+
+ final static int NUM_CFS = 10;
+ final static byte[] QUAL = Bytes.toBytes("qual");
+ final static int ROWCOUNT = 100;
+
+ private final static byte[][] families = new byte[NUM_CFS][];
+
+ @Rule
+ public TestName name = new TestName();
+
+ static {
+ for (int i = 0; i < NUM_CFS; i++) {
+ families[i] = Bytes.toBytes(family(i));
+ }
+ }
+
+ static byte[] rowkey(int i) {
+ return Bytes.toBytes(String.format("row_%08d", i));
+ }
+
+ static String family(int i) {
+ return String.format("family_%04d", i);
+ }
+
+ static byte[] value(int i) {
+ return Bytes.toBytes(String.format("%010d", i));
+ }
+
+ public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
+ byte[] val = value(value);
+ for (int i = 0; i < NUM_CFS; i++) {
+ Path testIn = new Path(dir, family(i));
+
+ TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
+ Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
+ }
+ }
+
+ private TableDescriptor createTableDesc(TableName name, int cfs) {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
+ IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
+ .forEachOrdered(builder::addColumnFamily);
+ return builder.build();
+ }
+
+ /**
+ * Creates a table with given table name and specified number of column families if the table does
+ * not already exist.
+ */
+ private void setupTable(final Connection connection, TableName table, int cfs)
+ throws IOException {
+ try {
+ LOG.info("Creating table " + table);
+ try (Admin admin = connection.getAdmin()) {
+ admin.createTable(createTableDesc(table, cfs));
+ }
+ } catch (TableExistsException tee) {
+ LOG.info("Table " + table + " already exists");
+ }
+ }
+
+ /**
+ * Creates a table with given table name,specified number of column families<br>
+ * and splitkeys if the table does not already exist.
+ * @param table
+ * @param cfs
+ * @param SPLIT_KEYS
+ */
+ private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
+ throws IOException {
+ try {
+ LOG.info("Creating table " + table);
+ util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
+ } catch (TableExistsException tee) {
+ LOG.info("Table " + table + " already exists");
+ }
+ }
+
+ private Path buildBulkFiles(TableName table, int value) throws Exception {
+ Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
+ Path bulk1 = new Path(dir, table.getNameAsString() + value);
+ FileSystem fs = util.getTestFileSystem();
+ buildHFiles(fs, bulk1, value);
+ return bulk1;
+ }
+
+ /**
+ * Populate table with known values.
+ */
+ private void populateTable(final Connection connection, TableName table, int value)
+ throws Exception {
+ // create HFiles for different column families
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
+ Path bulk1 = buildBulkFiles(table, value);
+ try (Table t = connection.getTable(table);
+ RegionLocator locator = connection.getRegionLocator(table);
+ Admin admin = connection.getAdmin()) {
+ lih.doBulkLoad(bulk1, admin, t, locator);
+ }
+ }
+
+ /**
+ * Split the known table in half. (this is hard coded for this test suite)
+ */
+ private void forceSplit(TableName table) {
+ try {
+ // need to call regions server to by synchronous but isn't visible.
+ HRegionServer hrs = util.getRSForFirstRegionInTable(table);
+
+ for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
+ if (hri.getTable().equals(table)) {
+ util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
+ // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
+ }
+ }
+
+ // verify that split completed.
+ int regions;
+ do {
+ regions = 0;
+ for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
+ if (hri.getTable().equals(table)) {
+ regions++;
+ }
+ }
+ if (regions != 2) {
+ LOG.info("Taking some time to complete split...");
+ Thread.sleep(250);
+ }
+ } while (regions != 2);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ util = new HBaseTestingUtility();
+ util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+ util.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ /**
+ * Checks that all columns have the expected value and that there is the expected number of rows.
+ * @throws IOException
+ */
+ void assertExpectedTable(TableName table, int count, int value) throws IOException {
+ List<TableDescriptor> htds = util.getAdmin().listTableDescriptors(table.getNameAsString());
+ assertEquals(htds.size(), 1);
+ try (Table t = util.getConnection().getTable(table);
+ ResultScanner sr = t.getScanner(new Scan())) {
+ int i = 0;
+ for (Result r; (r = sr.next()) != null;) {
+ r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
+ .forEach(v -> assertArrayEquals(value(value), v));
+ i++;
+ }
+ assertEquals(count, i);
+ } catch (IOException e) {
+ fail("Failed due to exception");
+ }
+ }
+
+ /**
+ * Test that shows that exception thrown from the RS side will result in an exception on the
+ * LIHFile client.
+ */
+ @Test(expected = IOException.class, timeout = 120000)
+ public void testBulkLoadPhaseFailure() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ final AtomicInteger attmptedCalls = new AtomicInteger();
+ final AtomicInteger failedCalls = new AtomicInteger();
+ util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+ setupTable(connection, table, 10);
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+ @Override
+ protected List<LoadQueueItem> tryAtomicRegionLoad(
+ ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
+ Collection<LoadQueueItem> lqis) throws IOException {
+ int i = attmptedCalls.incrementAndGet();
+ if (i == 1) {
+ Connection errConn;
+ try {
+ errConn = getMockedConnection(util.getConfiguration());
+ serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true);
+ } catch (Exception e) {
+ LOG.fatal("mocking cruft, should never happen", e);
+ throw new RuntimeException("mocking cruft, should never happen");
+ }
+ failedCalls.incrementAndGet();
+ return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
+ }
+
+ return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
+ }
+ };
+ try {
+ // create HFiles for different column families
+ Path dir = buildBulkFiles(table, 1);
+ try (Table t = connection.getTable(table);
+ RegionLocator locator = connection.getRegionLocator(table);
+ Admin admin = connection.getAdmin()) {
+ lih.doBulkLoad(dir, admin, t, locator);
+ }
+ } finally {
+ util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ }
+ fail("doBulkLoad should have thrown an exception");
+ }
+ }
+
+ /**
+ * Test that shows that exception thrown from the RS side will result in the expected number of
+ * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
+ * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
+ */
+ @Test
+ public void testRetryOnIOException() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ final AtomicInteger calls = new AtomicInteger(1);
+ final Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
+ util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
+ final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+ @Override
+ protected List<LoadQueueItem> tryAtomicRegionLoad(
+ ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first,
+ Collection<LoadQueueItem> lqis) throws IOException {
+ if (calls.getAndIncrement() < util.getConfiguration().getInt(
+ HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) -
+ 1) {
+ ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn,
+ tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(),
+ HConstants.PRIORITY_UNSET) {
+ @Override
+ public byte[] rpcCall() throws Exception {
+ throw new IOException("Error calling something on RegionServer");
+ }
+ };
+ return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
+ } else {
+ return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
+ }
+ }
+ };
+ setupTable(conn, table, 10);
+ Path dir = buildBulkFiles(table, 1);
+ lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table));
+ util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
+
+ }
+
+ private ClusterConnection getMockedConnection(final Configuration conf)
+ throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
+ ClusterConnection c = Mockito.mock(ClusterConnection.class);
+ Mockito.when(c.getConfiguration()).thenReturn(conf);
+ Mockito.doNothing().when(c).close();
+ // Make it so we return a particular location when asked.
+ final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
+ ServerName.valueOf("example.org", 1234, 0));
+ Mockito.when(
+ c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean()))
+ .thenReturn(loc);
+ Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc);
+ ClientProtos.ClientService.BlockingInterface hri =
+ Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
+ Mockito
+ .when(
+ hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any()))
+ .thenThrow(new ServiceException(new IOException("injecting bulk load error")));
+ Mockito.when(c.getClient(Mockito.any(ServerName.class))).thenReturn(hri);
+ return c;
+ }
+
+ /**
+ * This test exercises the path where there is a split after initial validation but before the
+ * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
+ * split just before the atomic region load.
+ */
+ @Test(timeout = 120000)
+ public void testSplitWhileBulkLoadPhase() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+ setupTable(connection, table, 10);
+ populateTable(connection, table, 1);
+ assertExpectedTable(table, ROWCOUNT, 1);
+
+ // Now let's cause trouble. This will occur after checks and cause bulk
+ // files to fail when attempt to atomically import. This is recoverable.
+ final AtomicInteger attemptedCalls = new AtomicInteger();
+ LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
+ @Override
+ protected void bulkLoadPhase(final Table htable, final Connection conn,
+ ExecutorService pool, Deque<LoadQueueItem> queue,
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
+ Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+ int i = attemptedCalls.incrementAndGet();
+ if (i == 1) {
+ // On first attempt force a split.
+ forceSplit(table);
+ }
+ super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
+ }
+ };
+
+ // create HFiles for different column families
+ try (Table t = connection.getTable(table);
+ RegionLocator locator = connection.getRegionLocator(table);
+ Admin admin = connection.getAdmin()) {
+ Path bulk = buildBulkFiles(table, 2);
+ lih2.doBulkLoad(bulk, admin, t, locator);
+ }
+
+ // check that data was loaded
+ // The three expected attempts are 1) failure because need to split, 2)
+ // load of split top 3) load of split bottom
+ assertEquals(attemptedCalls.get(), 3);
+ assertExpectedTable(table, ROWCOUNT, 2);
+ }
+ }
+
+ /**
+ * This test splits a table and attempts to bulk load. The bulk import files should be split
+ * before atomically importing.
+ */
+ @Test(timeout = 120000)
+ public void testGroupOrSplitPresplit() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+ setupTable(connection, table, 10);
+ populateTable(connection, table, 1);
+ assertExpectedTable(connection, table, ROWCOUNT, 1);
+ forceSplit(table);
+
+ final AtomicInteger countedLqis = new AtomicInteger();
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+ @Override
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
+ final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ Pair<List<LoadQueueItem>, String> lqis =
+ super.groupOrSplit(regionGroups, item, htable, startEndKeys);
+ if (lqis != null && lqis.getFirst() != null) {
+ countedLqis.addAndGet(lqis.getFirst().size());
+ }
+ return lqis;
+ }
+ };
+
+ // create HFiles for different column families
+ Path bulk = buildBulkFiles(table, 2);
+ try (Table t = connection.getTable(table);
+ RegionLocator locator = connection.getRegionLocator(table);
+ Admin admin = connection.getAdmin()) {
+ lih.doBulkLoad(bulk, admin, t, locator);
+ }
+ assertExpectedTable(connection, table, ROWCOUNT, 2);
+ assertEquals(20, countedLqis.get());
+ }
+ }
+
+ /**
+ * This test creates a table with many small regions. The bulk load files would be splitted
+ * multiple times before all of them can be loaded successfully.
+ */
+ @Test(timeout = 120000)
+ public void testSplitTmpFileCleanUp() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
+ Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
+ Bytes.toBytes("row_00000050") };
+ try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+ setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
+
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
+
+ // create HFiles
+ Path bulk = buildBulkFiles(table, 2);
+ try (Table t = connection.getTable(table);
+ RegionLocator locator = connection.getRegionLocator(table);
+ Admin admin = connection.getAdmin()) {
+ lih.doBulkLoad(bulk, admin, t, locator);
+ }
+ // family path
+ Path tmpPath = new Path(bulk, family(0));
+ // TMP_DIR under family path
+ tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
+ FileSystem fs = bulk.getFileSystem(util.getConfiguration());
+ // HFiles have been splitted, there is TMP_DIR
+ assertTrue(fs.exists(tmpPath));
+ // TMP_DIR should have been cleaned-up
+ assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
+ FSUtils.listStatus(fs, tmpPath));
+ assertExpectedTable(connection, table, ROWCOUNT, 2);
+ }
+ }
+
+ /**
+ * This simulates an remote exception which should cause LIHF to exit with an exception.
+ */
+ @Test(expected = IOException.class, timeout = 120000)
+ public void testGroupOrSplitFailure() throws Exception {
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+ setupTable(connection, tableName, 10);
+
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+ int i = 0;
+
+ @Override
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
+ final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ i++;
+
+ if (i == 5) {
+ throw new IOException("failure");
+ }
+ return super.groupOrSplit(regionGroups, item, table, startEndKeys);
+ }
+ };
+
+ // create HFiles for different column families
+ Path dir = buildBulkFiles(tableName, 1);
+ try (Table t = connection.getTable(tableName);
+ RegionLocator locator = connection.getRegionLocator(tableName);
+ Admin admin = connection.getAdmin()) {
+ lih.doBulkLoad(dir, admin, t, locator);
+ }
+ }
+
+ fail("doBulkLoad should have thrown an exception");
+ }
+
+ @Test(timeout = 120000)
+ public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
+ // Share connection. We were failing to find the table with our new reverse scan because it
+ // looks for first region, not any region -- that is how it works now. The below removes first
+ // region in test. Was reliant on the Connection caching having first region.
+ Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
+ Table table = connection.getTable(tableName);
+
+ setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
+ Path dir = buildBulkFiles(tableName, 2);
+
+ final AtomicInteger countedLqis = new AtomicInteger();
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
+
+ @Override
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
+ final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ Pair<List<LoadQueueItem>, String> lqis =
+ super.groupOrSplit(regionGroups, item, htable, startEndKeys);
+ if (lqis != null && lqis.getFirst() != null) {
+ countedLqis.addAndGet(lqis.getFirst().size());
+ }
+ return lqis;
+ }
+ };
+
+ // do bulkload when there is no region hole in hbase:meta.
+ try (Table t = connection.getTable(tableName);
+ RegionLocator locator = connection.getRegionLocator(tableName);
+ Admin admin = connection.getAdmin()) {
+ loader.doBulkLoad(dir, admin, t, locator);
+ } catch (Exception e) {
+ LOG.error("exeception=", e);
+ }
+ // check if all the data are loaded into the table.
+ this.assertExpectedTable(tableName, ROWCOUNT, 2);
+
+ dir = buildBulkFiles(tableName, 3);
+
+ // Mess it up by leaving a hole in the hbase:meta
+ List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
+ for (HRegionInfo regionInfo : regionInfos) {
+ if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
+ MetaTableAccessor.deleteRegion(connection, regionInfo);
+ break;
+ }
+ }
+
+ try (Table t = connection.getTable(tableName);
+ RegionLocator locator = connection.getRegionLocator(tableName);
+ Admin admin = connection.getAdmin()) {
+ loader.doBulkLoad(dir, admin, t, locator);
+ } catch (Exception e) {
+ LOG.error("exception=", e);
+ assertTrue("IOException expected", e instanceof IOException);
+ }
+
+ table.close();
+
+ // Make sure at least the one region that still exists can be found.
+ regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
+ assertTrue(regionInfos.size() >= 1);
+
+ this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
+ connection.close();
+ }
+
+ /**
+ * Checks that all columns have the expected value and that there is the expected number of rows.
+ * @throws IOException
+ */
+ void assertExpectedTable(final Connection connection, TableName table, int count, int value)
+ throws IOException {
+ List<TableDescriptor> htds = util.getAdmin().listTableDescriptors(table.getNameAsString());
+ assertEquals(htds.size(), 1);
+ try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
+ int i = 0;
+ for (Result r; (r = sr.next()) != null;) {
+ r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
+ .forEach(v -> assertArrayEquals(value(value), v));
+ i++;
+ }
+ assertEquals(count, i);
+ } catch (IOException e) {
+ fail("Failed due to exception");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
new file mode 100644
index 0000000..3d4f4c6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode. This suite is unable
+ * to verify the security handoff/turnover as miniCluster is running as system user thus has root
+ * privileges and delegation tokens don't seem to work on miniDFS.
+ * <p>
+ * Thus SecureBulkload can only be completely verified by running integration tests against a secure
+ * cluster. This suite is still invaluable as it verifies the other mechanisms that need to be
+ * supported as part of a LoadIncrementalFiles call.
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // set the always on security provider
+ UserProvider.setUserProviderForTesting(util.getConfiguration(),
+ HadoopSecurityEnabledUserProviderForTesting.class);
+ // setup configuration
+ SecureTestUtil.enableSecurity(util.getConfiguration());
+ util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+ MAX_FILES_PER_REGION_PER_FAMILY);
+ // change default behavior so that tag values are returned with normal rpcs
+ util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
+ KeyValueCodecWithTags.class.getCanonicalName());
+
+ util.startMiniCluster();
+
+ // Wait for the ACL table to become available
+ util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+
+ setupNamespace();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
new file mode 100644
index 0000000..58fea9d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Reruns TestSecureLoadIncrementalHFilesSplitRecovery using LoadIncrementalHFiles in secure mode.
+ * This suite is unable to verify the security handoff/turnove as miniCluster is running as system
+ * user thus has root privileges and delegation tokens don't seem to work on miniDFS.
+ * <p>
+ * Thus SecureBulkload can only be completely verified by running integration tests against a secure
+ * cluster. This suite is still invaluable as it verifies the other mechanisms that need to be
+ * supported as part of a LoadIncrementalFiles call.
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestSecureLoadIncrementalHFilesSplitRecovery
+ extends TestLoadIncrementalHFilesSplitRecovery {
+
+ // This "overrides" the parent static method
+ // make sure they are in sync
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ util = new HBaseTestingUtility();
+ // set the always on security provider
+ UserProvider.setUserProviderForTesting(util.getConfiguration(),
+ HadoopSecurityEnabledUserProviderForTesting.class);
+ // setup configuration
+ SecureTestUtil.enableSecurity(util.getConfiguration());
+
+ util.startMiniCluster();
+
+ // Wait for the ACL table to become available
+ util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+ }
+
+ // Disabling this test as it does not work in secure mode
+ @Test(timeout = 180000)
+ @Override
+ public void testBulkLoadPhaseFailure() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
index f45c0b9..33fbb68 100644
--- a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
+++ b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
index 040546d..2adba32 100644
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
@@ -41,7 +41,7 @@ import java.util.List;
* path/to/hbase-spark.jar {path/to/output/HFiles}
*
* This example will output put hfiles in {path/to/output/HFiles}, and user can run
- * 'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
+ * 'hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
*/
final public class JavaHBaseBulkLoadExample {
private JavaHBaseBulkLoadExample() {}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
index bfacbe8..e383b5e 100644
--- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
+++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
index d2b707e..a427327 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
@@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.spark
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{Get, ConnectionFactory}
import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile}
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/src/main/asciidoc/_chapters/ops_mgt.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc
index f96cd6c..6f7f9e0 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -577,7 +577,7 @@ There are two ways to invoke this utility, with explicit classname and via the d
.Explicit Classname
----
-$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
+$ bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
----
.Driver
[3/4] hbase git commit: HBASE-18699 Copy LoadIncrementalHFiles to
another package and mark the old one as deprecated
Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 7b4a353..285530d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -18,1288 +18,60 @@
*/
package org.apache.hadoop.hbase.mapreduce;
-import static java.lang.String.format;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.HashMultimap;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimaps;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Deque;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.io.HFileLink;
-import org.apache.hadoop.hbase.io.HalfStoreFileReader;
-import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.FsDelegationToken;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSHDFSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
/**
* Tool to load the output of HFileOutputFormat into an existing table.
+ * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Use
+ * {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles} instead.
*/
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NM_SAME_SIMPLE_NAME_AS_SUPERCLASS",
+ justification = "Temporary glue. To be removed")
+@Deprecated
@InterfaceAudience.Public
-public class LoadIncrementalHFiles extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
- private boolean initalized = false;
-
- public static final String NAME = "completebulkload";
- static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
- public static final String MAX_FILES_PER_REGION_PER_FAMILY
- = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
- private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
- public final static String CREATE_TABLE_CONF_KEY = "create.table";
- public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
- public final static String ALWAYS_COPY_FILES = "always.copy.files";
-
- // We use a '.' prefix which is ignored when walking directory trees
- // above. It is invalid family name.
- final static String TMP_DIR = ".tmp";
-
- private int maxFilesPerRegionPerFamily;
- private boolean assignSeqIds;
- private Set<String> unmatchedFamilies = new HashSet<>();
-
- // Source filesystem
- private FileSystem fs;
- // Source delegation token
- private FsDelegationToken fsDelegationToken;
- private String bulkToken;
- private UserProvider userProvider;
- private int nrThreads;
- private RpcControllerFactory rpcControllerFactory;
- private AtomicInteger numRetries;
-
- private Map<LoadQueueItem, ByteBuffer> retValue = null;
-
- public LoadIncrementalHFiles(Configuration conf) throws Exception {
- super(conf);
- this.rpcControllerFactory = new RpcControllerFactory(conf);
- initialize();
- }
-
- private void initialize() throws IOException {
- if (initalized) {
- return;
- }
- // make a copy, just to be sure we're not overriding someone else's config
- setConf(HBaseConfiguration.create(getConf()));
- Configuration conf = getConf();
- // disable blockcache for tool invocation, see HBASE-10500
- conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
- this.userProvider = UserProvider.instantiate(conf);
- this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
- assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
- maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
- nrThreads = conf.getInt("hbase.loadincremental.threads.max",
- Runtime.getRuntime().availableProcessors());
- initalized = true;
- numRetries = new AtomicInteger(1);
- }
-
- private void usage() {
- System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
- + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
- + " Note: if you set this to 'no', then the target table must already exist in HBase\n -D"
- + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
- + "\n");
- }
-
- private interface BulkHFileVisitor<TFamily> {
- TFamily bulkFamily(final byte[] familyName)
- throws IOException;
- void bulkHFile(final TFamily family, final FileStatus hfileStatus)
- throws IOException;
- }
+public class LoadIncrementalHFiles extends org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {
/**
- * Iterate over the bulkDir hfiles.
- * Skip reference, HFileLink, files starting with "_" and non-valid hfiles.
+ * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Use
+ * {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem} instead.
*/
- private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
- final BulkHFileVisitor<TFamily> visitor) throws IOException {
- visitBulkHFiles(fs, bulkDir, visitor, true);
- }
-
- /**
- * Iterate over the bulkDir hfiles.
- * Skip reference, HFileLink, files starting with "_".
- * Check and skip non-valid hfiles by default, or skip this validation by setting
- * 'hbase.loadincremental.validate.hfile' to false.
- */
- private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
- final BulkHFileVisitor<TFamily> visitor, final boolean validateHFile) throws IOException {
- if (!fs.exists(bulkDir)) {
- throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
- }
-
- FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
- if (familyDirStatuses == null) {
- throw new FileNotFoundException("No families found in " + bulkDir);
- }
-
- for (FileStatus familyStat : familyDirStatuses) {
- if (!familyStat.isDirectory()) {
- LOG.warn("Skipping non-directory " + familyStat.getPath());
- continue;
- }
- Path familyDir = familyStat.getPath();
- byte[] familyName = familyDir.getName().getBytes();
- // Skip invalid family
- try {
- HColumnDescriptor.isLegalFamilyName(familyName);
- }
- catch (IllegalArgumentException e) {
- LOG.warn("Skipping invalid " + familyStat.getPath());
- continue;
- }
- TFamily family = visitor.bulkFamily(familyName);
-
- FileStatus[] hfileStatuses = fs.listStatus(familyDir);
- for (FileStatus hfileStatus : hfileStatuses) {
- if (!fs.isFile(hfileStatus.getPath())) {
- LOG.warn("Skipping non-file " + hfileStatus);
- continue;
- }
-
- Path hfile = hfileStatus.getPath();
- // Skip "_", reference, HFileLink
- String fileName = hfile.getName();
- if (fileName.startsWith("_")) {
- continue;
- }
- if (StoreFileInfo.isReference(fileName)) {
- LOG.warn("Skipping reference " + fileName);
- continue;
- }
- if (HFileLink.isHFileLink(fileName)) {
- LOG.warn("Skipping HFileLink " + fileName);
- continue;
- }
-
- // Validate HFile Format if needed
- if (validateHFile) {
- try {
- if (!HFile.isHFileFormat(fs, hfile)) {
- LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
- continue;
- }
- } catch (FileNotFoundException e) {
- LOG.warn("the file " + hfile + " was removed");
- continue;
- }
- }
-
- visitor.bulkHFile(family, hfileStatus);
- }
- }
- }
-
- /**
- * Represents an HFile waiting to be loaded. An queue is used
- * in this class in order to support the case where a region has
- * split during the process of the load. When this happens,
- * the HFile is split into two physical parts across the new
- * region boundary, and each part is added back into the queue.
- * The import process finishes when the queue is empty.
- */
- public static class LoadQueueItem {
- final byte[] family;
- final Path hfilePath;
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NM_SAME_SIMPLE_NAME_AS_SUPERCLASS",
+ justification = "Temporary glue. To be removed")
+ @Deprecated
+ @InterfaceAudience.Public
+ public static class LoadQueueItem
+ extends org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem {
public LoadQueueItem(byte[] family, Path hfilePath) {
- this.family = family;
- this.hfilePath = hfilePath;
- }
-
- @Override
- public String toString() {
- return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
- }
-
- public byte[] getFamily() {
- return family;
- }
-
- public Path getFilePath() {
- return hfilePath;
- }
- }
-
- /*
- * Populate the Queue with given HFiles
- */
- private void populateLoadQueue(final Deque<LoadQueueItem> ret,
- Map<byte[], List<Path>> map) throws IOException {
- for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
- for (Path p : entry.getValue()) {
- ret.add(new LoadQueueItem(entry.getKey(), p));
- }
+ super(family, hfilePath);
}
}
- /**
- * Walk the given directory for all HFiles, and return a Queue
- * containing all such files.
- */
- private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
- final boolean validateHFile) throws IOException {
- fs = hfofDir.getFileSystem(getConf());
- visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
- @Override
- public byte[] bulkFamily(final byte[] familyName) {
- return familyName;
- }
- @Override
- public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
- long length = hfile.getLen();
- if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
- HConstants.DEFAULT_MAX_FILE_SIZE)) {
- LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
- length + " bytes can be problematic as it may lead to oversplitting.");
- }
- ret.add(new LoadQueueItem(family, hfile.getPath()));
- }
- }, validateHFile);
- }
-
- /**
- * Perform a bulk load of the given directory into the given
- * pre-existing table. This method is not threadsafe.
- *
- * @param hfofDir the directory that was provided as the output path
- * of a job using HFileOutputFormat
- * @param admin the Admin
- * @param table the table to load into
- * @param regionLocator region locator
- * @throws TableNotFoundException if table does not yet exist
- */
- public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
- RegionLocator regionLocator) throws TableNotFoundException, IOException {
- doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
- }
-
- void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
- SecureBulkLoadClient secureClient) throws IOException {
- fsDelegationToken.releaseDelegationToken();
- if (bulkToken != null && secureClient != null) {
- secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
- }
- if (pool != null) {
- pool.shutdown();
- }
- if (!queue.isEmpty()) {
- StringBuilder err = new StringBuilder();
- err.append("-------------------------------------------------\n");
- err.append("Bulk load aborted with some files not yet loaded:\n");
- err.append("-------------------------------------------------\n");
- for (LoadQueueItem q : queue) {
- err.append(" ").append(q.hfilePath).append('\n');
- }
- LOG.error(err);
- }
- }
- /**
- * Perform a bulk load of the given directory into the given
- * pre-existing table. This method is not threadsafe.
- *
- * @param map map of family to List of hfiles
- * @param admin the Admin
- * @param table the table to load into
- * @param regionLocator region locator
- * @param silence true to ignore unmatched column families
- * @param copyFile always copy hfiles if true
- * @throws TableNotFoundException if table does not yet exist
- */
- public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
- Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
- throws TableNotFoundException, IOException {
- if (!admin.isTableAvailable(regionLocator.getName())) {
- throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
- }
- // LQI queue does not need to be threadsafe -- all operations on this queue
- // happen in this thread
- Deque<LoadQueueItem> queue = new LinkedList<>();
- ExecutorService pool = null;
- SecureBulkLoadClient secureClient = null;
- try {
- prepareHFileQueue(map, table, queue, silence);
- if (queue.isEmpty()) {
- LOG.warn("Bulk load operation did not get any files to load");
- return;
- }
- pool = createExecutorService();
- secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
- for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
- for (Path p : entry.getValue()) {
- fs = p.getFileSystem(table.getConfiguration());
- break;
- }
- }
- retValue = performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
- } finally {
- cleanup(admin, queue, pool, secureClient);
- }
- }
-
- /**
- * Perform a bulk load of the given directory into the given
- * pre-existing table. This method is not threadsafe.
- *
- * @param hfofDir the directory that was provided as the output path
- * of a job using HFileOutputFormat
- * @param admin the Admin
- * @param table the table to load into
- * @param regionLocator region locator
- * @param silence true to ignore unmatched column families
- * @param copyFile always copy hfiles if true
- * @throws TableNotFoundException if table does not yet exist
- */
- public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
- RegionLocator regionLocator, boolean silence, boolean copyFile)
- throws TableNotFoundException, IOException {
- if (!admin.isTableAvailable(regionLocator.getName())) {
- throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
- }
-
- /*
- * Checking hfile format is a time-consuming operation, we should have an option to skip
- * this step when bulkloading millions of HFiles. See HBASE-13985.
- */
- boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
- if (!validateHFile) {
- LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
- "are not correct. If you fail to read data from your table after using this " +
- "option, consider removing the files and bulkload again without this option. " +
- "See HBASE-13985");
- }
- // LQI queue does not need to be threadsafe -- all operations on this queue
- // happen in this thread
- Deque<LoadQueueItem> queue = new LinkedList<>();
- ExecutorService pool = null;
- SecureBulkLoadClient secureClient = null;
- try {
- prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
-
- if (queue.isEmpty()) {
- LOG.warn("Bulk load operation did not find any files to load in " +
- "directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " +
- "subdirectories that correspond to column family names?");
- return;
- }
- pool = createExecutorService();
- secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
- retValue = performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
- } finally {
- cleanup(admin, queue, pool, secureClient);
- }
- }
-
- Map<LoadQueueItem, ByteBuffer> performBulkLoad(final Admin admin, Table table,
- RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool,
- SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
- int count = 0;
-
- if(isSecureBulkLoadEndpointAvailable()) {
- LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
- LOG.warn("Secure bulk load has been integrated into HBase core.");
- }
-
- //If using secure bulk load, get source delegation token, and
- //prepare staging directory and token
- // fs is the source filesystem
- fsDelegationToken.acquireDelegationToken(fs);
- bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
- Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
-
- Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
- // Assumes that region splits can happen while this occurs.
- while (!queue.isEmpty()) {
- // need to reload split keys each iteration.
- final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
- if (count != 0) {
- LOG.info("Split occurred while grouping HFiles, retry attempt " +
- + count + " with " + queue.size() + " files remaining to group or split");
- }
-
- int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
- maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
- if (maxRetries != 0 && count >= maxRetries) {
- throw new IOException("Retry attempted " + count +
- " times without completing, bailing out");
- }
- count++;
-
- // Using ByteBuffer for byte[] equality semantics
- pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
- Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
-
- if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
- // Error is logged inside checkHFilesCountPerRegionPerFamily.
- throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
- + " hfiles to one family of one region");
- }
-
- bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile,
- item2RegionMap);
-
- // NOTE: The next iteration's split / group could happen in parallel to
- // atomic bulkloads assuming that there are splits and no merges, and
- // that we can atomically pull out the groups we want to retry.
- }
-
- if (!queue.isEmpty()) {
- throw new RuntimeException("Bulk load aborted with some files not yet loaded."
- + "Please check log for more details.");
- }
- return item2RegionMap;
- }
-
- protected ClientServiceCallable<byte[]> buildClientServiceCallable(final Connection conn,
- TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
-
- final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
- for (LoadQueueItem lqi : lqis) {
- famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
- }
-
- return new ClientServiceCallable<byte[]>(conn,
- tableName, first, rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
- @Override
- protected byte[] rpcCall() throws Exception {
- SecureBulkLoadClient secureClient = null;
- boolean success = false;
- try {
- LOG.debug("Going to connect to server " + getLocation() + " for row "
- + Bytes.toStringBinary(getRow()) + " with hfile group " +
- LoadIncrementalHFiles.this.toString( famPaths));
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- try (Table table = conn.getTable(getTableName())) {
- secureClient = new SecureBulkLoadClient(getConf(), table);
- success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
- }
- return success ? regionName : null;
- } finally {
- //Best effort copying of files that might not have been imported
- //from the staging directory back to original location
- //in user directory
- if (secureClient != null && !success) {
- FileSystem targetFs = FileSystem.get(getConf());
- // fs is the source filesystem
- if (fs == null) {
- fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
- }
- // Check to see if the source and target filesystems are the same
- // If they are the same filesystem, we will try move the files back
- // because previously we moved them to the staging directory.
- if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
- for (Pair<byte[], String> el : famPaths) {
- Path hfileStagingPath = null;
- Path hfileOrigPath = new Path(el.getSecond());
- try {
- hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
- hfileOrigPath.getName());
- if (targetFs.rename(hfileStagingPath, hfileOrigPath)) {
- LOG.debug("Moved back file " + hfileOrigPath + " from " +
- hfileStagingPath);
- } else if (targetFs.exists(hfileStagingPath)) {
- LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
- hfileStagingPath);
- }
- } catch (Exception ex) {
- LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
- hfileStagingPath, ex);
- }
- }
- }
- }
- }
- }
- };
- }
-
- /**
- * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
- * passed directory and validates whether the prepared queue has all the valid table column
- * families in it.
- * @param hfilesDir directory containing list of hfiles to be loaded into the table
- * @param table table to which hfiles should be loaded
- * @param queue queue which needs to be loaded into the table
- * @param validateHFile if true hfiles will be validated for its format
- * @throws IOException If any I/O or network error occurred
- */
- public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
- boolean validateHFile) throws IOException {
- prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
- }
-
- /**
- * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
- * passed directory and validates whether the prepared queue has all the valid table column
- * families in it.
- * @param hfilesDir directory containing list of hfiles to be loaded into the table
- * @param table table to which hfiles should be loaded
- * @param queue queue which needs to be loaded into the table
- * @param validateHFile if true hfiles will be validated for its format
- * @param silence true to ignore unmatched column families
- * @throws IOException If any I/O or network error occurred
- */
- public void prepareHFileQueue(Path hfilesDir, Table table,
- Deque<LoadQueueItem> queue, boolean validateHFile, boolean silence) throws IOException {
- discoverLoadQueue(queue, hfilesDir, validateHFile);
- validateFamiliesInHFiles(table, queue, silence);
- }
-
- /**
- * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
- * passed directory and validates whether the prepared queue has all the valid table column
- * families in it.
- * @param map map of family to List of hfiles
- * @param table table to which hfiles should be loaded
- * @param queue queue which needs to be loaded into the table
- * @param silence true to ignore unmatched column families
- * @throws IOException If any I/O or network error occurred
- */
- public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
- Deque<LoadQueueItem> queue, boolean silence) throws IOException {
- populateLoadQueue(queue, map);
- validateFamiliesInHFiles(table, queue, silence);
- }
-
- // Initialize a thread pool
- private ExecutorService createExecutorService() {
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setNameFormat("LoadIncrementalHFiles-%1$d");
- ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(), builder.build());
- ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
- return pool;
- }
-
- /**
- * Checks whether there is any invalid family name in HFiles to be bulk loaded.
- */
- private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
- throws IOException {
- ColumnFamilyDescriptor[] families = table.getDescriptor().getColumnFamilies();
- List<String> familyNames = new ArrayList<>(families.length);
- for (ColumnFamilyDescriptor family : families) {
- familyNames.add(family.getNameAsString());
- }
- Iterator<LoadQueueItem> queueIter = queue.iterator();
- while (queueIter.hasNext()) {
- LoadQueueItem lqi = queueIter.next();
- String familyNameInHFile = Bytes.toString(lqi.family);
- if (!familyNames.contains(familyNameInHFile)) {
- unmatchedFamilies.add(familyNameInHFile);
- }
- }
- if (unmatchedFamilies.size() > 0) {
- String msg =
- "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
- + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
- + familyNames;
- LOG.error(msg);
- if (!silence) throw new IOException(msg);
- }
- }
-
- /**
- * Used by the replication sink to load the hfiles from the source cluster. It does the following,
- * <ol>
- * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
- * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
- * </li>
- * </ol>
- * @param table Table to which these hfiles should be loaded to
- * @param conn Connection to use
- * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
- * @param startEndKeys starting and ending row keys of the region
- */
- public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
- Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- loadHFileQueue(table, conn, queue, startEndKeys, false);
- }
-
- /**
- * Used by the replication sink to load the hfiles from the source cluster. It does the following,
- * <ol>
- * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
- * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
- * </li>
- * </ol>
- * @param table Table to which these hfiles should be loaded to
- * @param conn Connection to use
- * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
- * @param startEndKeys starting and ending row keys of the region
- */
- public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
- Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
- ExecutorService pool = null;
- try {
- pool = createExecutorService();
- Multimap<ByteBuffer, LoadQueueItem> regionGroups =
- groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
- bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
- } finally {
- if (pool != null) {
- pool.shutdown();
- }
- }
- }
-
- /**
- * This takes the LQI's grouped by likely regions and attempts to bulk load
- * them. Any failures are re-queued for another pass with the
- * groupOrSplitPhase.
- */
- protected void bulkLoadPhase(final Table table, final Connection conn,
- ExecutorService pool, Deque<LoadQueueItem> queue,
- final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
- Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
- // atomically bulk load the groups.
- Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
- for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){
- final byte[] first = e.getKey().array();
- final Collection<LoadQueueItem> lqis = e.getValue();
-
- final ClientServiceCallable<byte[]> serviceCallable =
- buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile);
-
- final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
- @Override
- public List<LoadQueueItem> call() throws Exception {
- List<LoadQueueItem> toRetry =
- tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis);
- return toRetry;
- }
- };
- if (item2RegionMap != null) {
- for (LoadQueueItem lqi : lqis) {
- item2RegionMap.put(lqi, e.getKey());
- }
- }
- loadingFutures.add(pool.submit(call));
- }
-
- // get all the results.
- for (Future<List<LoadQueueItem>> future : loadingFutures) {
- try {
- List<LoadQueueItem> toRetry = future.get();
-
- if (item2RegionMap != null) {
- for (LoadQueueItem lqi : toRetry) {
- item2RegionMap.remove(lqi);
- }
- }
- // LQIs that are requeued to be regrouped.
- queue.addAll(toRetry);
-
- } catch (ExecutionException e1) {
- Throwable t = e1.getCause();
- if (t instanceof IOException) {
- // At this point something unrecoverable has happened.
- // TODO Implement bulk load recovery
- throw new IOException("BulkLoad encountered an unrecoverable problem", t);
- }
- LOG.error("Unexpected execution exception during bulk load", e1);
- throw new IllegalStateException(t);
- } catch (InterruptedException e1) {
- LOG.error("Unexpected interrupted exception during bulk load", e1);
- throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
- }
- }
- }
-
- private boolean checkHFilesCountPerRegionPerFamily(
- final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
- for (Entry<ByteBuffer,
- ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
- final Collection<LoadQueueItem> lqis = e.getValue();
- HashMap<byte[], MutableInt> filesMap = new HashMap<>();
- for (LoadQueueItem lqi: lqis) {
- MutableInt count = filesMap.get(lqi.family);
- if (count == null) {
- count = new MutableInt();
- filesMap.put(lqi.family, count);
- }
- count.increment();
- if (count.intValue() > maxFilesPerRegionPerFamily) {
- LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
- + " hfiles to family " + Bytes.toStringBinary(lqi.family)
- + " of region with start key "
- + Bytes.toStringBinary(e.getKey()));
- return false;
- }
- }
- }
- return true;
- }
-
- /**
- * @param table the table to load into
- * @param pool the ExecutorService
- * @param queue the queue for LoadQueueItem
- * @param startEndKeys start and end keys
- * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
- */
- private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
- final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
- final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- // <region start key, LQI> need synchronized only within this scope of this
- // phase because of the puts that happen in futures.
- Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
- final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
- Set<String> missingHFiles = new HashSet<>();
- Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = new Pair<>(regionGroups,
- missingHFiles);
-
- // drain LQIs and figure out bulk load groups
- Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
- while (!queue.isEmpty()) {
- final LoadQueueItem item = queue.remove();
-
- final Callable<Pair<List<LoadQueueItem>, String>> call =
- new Callable<Pair<List<LoadQueueItem>, String>>() {
- @Override
- public Pair<List<LoadQueueItem>, String> call() throws Exception {
- Pair<List<LoadQueueItem>, String> splits = groupOrSplit(regionGroups, item, table,
- startEndKeys);
- return splits;
- }
- };
- splittingFutures.add(pool.submit(call));
- }
- // get all the results. All grouping and splitting must finish before
- // we can attempt the atomic loads.
- for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
- try {
- Pair<List<LoadQueueItem>, String> splits = lqis.get();
- if (splits != null) {
- if (splits.getFirst() != null) {
- queue.addAll(splits.getFirst());
- } else {
- missingHFiles.add(splits.getSecond());
- }
- }
- } catch (ExecutionException e1) {
- Throwable t = e1.getCause();
- if (t instanceof IOException) {
- LOG.error("IOException during splitting", e1);
- throw (IOException)t; // would have been thrown if not parallelized,
- }
- LOG.error("Unexpected execution exception during splitting", e1);
- throw new IllegalStateException(t);
- } catch (InterruptedException e1) {
- LOG.error("Unexpected interrupted exception during splitting", e1);
- throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
- }
- }
- return pair;
- }
-
- // unique file name for the table
- private String getUniqueName() {
- return UUID.randomUUID().toString().replaceAll("-", "");
- }
-
- protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
- final Table table, byte[] startKey,
- byte[] splitKey) throws IOException {
- final Path hfilePath = item.hfilePath;
-
- Path tmpDir = item.hfilePath.getParent();
- if (!tmpDir.getName().equals(TMP_DIR)) {
- tmpDir = new Path(tmpDir, TMP_DIR);
- }
-
- LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
- "region. Splitting...");
-
- String uniqueName = getUniqueName();
- HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
-
- Path botOut = new Path(tmpDir, uniqueName + ".bottom");
- Path topOut = new Path(tmpDir, uniqueName + ".top");
- splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
-
- FileSystem fs = tmpDir.getFileSystem(getConf());
- fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
- fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
- fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
-
- // Add these back at the *front* of the queue, so there's a lower
- // chance that the region will just split again before we get there.
- List<LoadQueueItem> lqis = new ArrayList<>(2);
- lqis.add(new LoadQueueItem(item.family, botOut));
- lqis.add(new LoadQueueItem(item.family, topOut));
-
- // If the current item is already the result of previous splits,
- // we don't need it anymore. Clean up to save space.
- // It is not part of the original input files.
- try {
- tmpDir = item.hfilePath.getParent();
- if (tmpDir.getName().equals(TMP_DIR)) {
- fs.delete(item.hfilePath, false);
- }
- } catch (IOException e) {
- LOG.warn("Unable to delete temporary split file " + item.hfilePath);
- }
- LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
- return lqis;
- }
-
- /**
- * Attempt to assign the given load queue item into its target region group.
- * If the hfile boundary no longer fits into a region, physically splits
- * the hfile such that the new bottom half will fit and returns the list of
- * LQI's corresponding to the resultant hfiles.
- *
- * protected for testing
- * @throws IOException if an IO failure is encountered
- */
- protected Pair<List<LoadQueueItem>, String> groupOrSplit(
- Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
- final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- final Path hfilePath = item.hfilePath;
- // fs is the source filesystem
- if (fs == null) {
- fs = hfilePath.getFileSystem(getConf());
- }
- HFile.Reader hfr = null;
- try {
- hfr = HFile.createReader(fs, hfilePath, new CacheConfig(getConf()), true, getConf());
- } catch (FileNotFoundException fnfe) {
- LOG.debug("encountered", fnfe);
- return new Pair<>(null, hfilePath.getName());
- }
- final byte[] first, last;
- try {
- hfr.loadFileInfo();
- first = hfr.getFirstRowKey();
- last = hfr.getLastRowKey();
- } finally {
- hfr.close();
- }
-
- LOG.info("Trying to load hfile=" + hfilePath +
- " first=" + Bytes.toStringBinary(first) +
- " last=" + Bytes.toStringBinary(last));
- if (first == null || last == null) {
- assert first == null && last == null;
- // TODO what if this is due to a bad HFile?
- LOG.info("hfile " + hfilePath + " has no entries, skipping");
- return null;
- }
- if (Bytes.compareTo(first, last) > 0) {
- throw new IllegalArgumentException(
- "Invalid range: " + Bytes.toStringBinary(first) +
- " > " + Bytes.toStringBinary(last));
- }
- int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
- Bytes.BYTES_COMPARATOR);
- if (idx < 0) {
- // not on boundary, returns -(insertion index). Calculate region it
- // would be in.
- idx = -(idx + 1) - 1;
- }
- final int indexForCallable = idx;
-
- /**
- * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
- * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
- * region. 3) if the endkey of the last region is not empty.
- */
- if (indexForCallable < 0) {
- throw new IOException("The first region info for table "
- + table.getName()
- + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
- } else if ((indexForCallable == startEndKeys.getFirst().length - 1)
- && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
- throw new IOException("The last region info for table "
- + table.getName()
- + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
- } else if (indexForCallable + 1 < startEndKeys.getFirst().length
- && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
- startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
- throw new IOException("The endkey of one region for table "
- + table.getName()
- + " is not equal to the startkey of the next region in hbase:meta."
- + "Please use hbck tool to fix it first.");
- }
-
- boolean lastKeyInRange =
- Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
- Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
- if (!lastKeyInRange) {
- List<LoadQueueItem> lqis = splitStoreFile(item, table,
- startEndKeys.getFirst()[indexForCallable],
- startEndKeys.getSecond()[indexForCallable]);
- return new Pair<>(lqis, null);
- }
-
- // group regions.
- regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
- return null;
- }
-
- /**
- * Attempts to do an atomic load of many hfiles into a region. If it fails,
- * it returns a list of hfiles that need to be retried. If it is successful
- * it will return an empty list.
- *
- * NOTE: To maintain row atomicity guarantees, region server callable should
- * succeed atomically and fails atomically.
- *
- * Protected for testing.
- *
- * @return empty list if success, list of items to retry on recoverable
- * failure
- */
- protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
- final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
- throws IOException {
- try {
- List<LoadQueueItem> toRetry = new ArrayList<>();
- Configuration conf = getConf();
- byte[] region = RpcRetryingCallerFactory.instantiate(conf,
- null).<byte[]> newCaller()
- .callWithRetries(serviceCallable, Integer.MAX_VALUE);
- if (region == null) {
- LOG.warn("Attempt to bulk load region containing "
- + Bytes.toStringBinary(first) + " into table "
- + tableName + " with files " + lqis
- + " failed. This is recoverable and they will be retried.");
- toRetry.addAll(lqis); // return lqi's to retry
- }
- // success
- return toRetry;
- } catch (IOException e) {
- LOG.error("Encountered unrecoverable error from region server, additional details: "
- + serviceCallable.getExceptionMessageAdditionalDetail(), e);
- throw e;
- }
- }
-
- private final String toString(List<Pair<byte[], String>> list) {
- StringBuffer sb = new StringBuffer();
- sb.append("[");
- if(list != null){
- for(Pair<byte[], String> pair: list) {
- sb.append("{");
- sb.append(Bytes.toStringBinary(pair.getFirst()));
- sb.append(",");
- sb.append(pair.getSecond());
- sb.append("}");
- }
- }
- sb.append("]");
- return sb.toString();
- }
- private boolean isSecureBulkLoadEndpointAvailable() {
- String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
- return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- }
-
- /**
- * Split a storefile into a top and bottom half, maintaining
- * the metadata, recreating bloom filters, etc.
- */
- static void splitStoreFile(
- Configuration conf, Path inFile,
- HColumnDescriptor familyDesc, byte[] splitKey,
- Path bottomOut, Path topOut) throws IOException {
- // Open reader with no block cache, and not in-memory
- Reference topReference = Reference.createTopReference(splitKey);
- Reference bottomReference = Reference.createBottomReference(splitKey);
-
- copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
- copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
- }
-
- /**
- * Copy half of an HFile into a new HFile.
- */
- private static void copyHFileHalf(
- Configuration conf, Path inFile, Path outFile, Reference reference,
- HColumnDescriptor familyDescriptor)
- throws IOException {
- FileSystem fs = inFile.getFileSystem(conf);
- CacheConfig cacheConf = new CacheConfig(conf);
- HalfStoreFileReader halfReader = null;
- StoreFileWriter halfWriter = null;
- try {
- halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
- new AtomicInteger(0), true, conf);
- Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
-
- int blocksize = familyDescriptor.getBlocksize();
- Algorithm compression = familyDescriptor.getCompressionType();
- BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
- HFileContext hFileContext = new HFileContextBuilder()
- .withCompression(compression)
- .withChecksumType(HStore.getChecksumType(conf))
- .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
- .withBlockSize(blocksize)
- .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
- .withIncludesTags(true)
- .build();
- halfWriter = new StoreFileWriter.Builder(conf, cacheConf,
- fs)
- .withFilePath(outFile)
- .withBloomType(bloomFilterType)
- .withFileContext(hFileContext)
- .build();
- HFileScanner scanner = halfReader.getScanner(false, false, false);
- scanner.seekTo();
- do {
- halfWriter.append(scanner.getCell());
- } while (scanner.next());
-
- for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
- if (shouldCopyHFileMetaKey(entry.getKey())) {
- halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
- }
- }
- } finally {
- if (halfWriter != null) {
- halfWriter.close();
- }
- if (halfReader != null) {
- halfReader.close(cacheConf.shouldEvictOnClose());
- }
- }
- }
-
- private static boolean shouldCopyHFileMetaKey(byte[] key) {
- // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
- if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
- return false;
- }
-
- return !HFile.isReservedFileInfoKey(key);
- }
-
- /*
- * Infers region boundaries for a new table.
- * Parameter:
- * bdryMap is a map between keys to an integer belonging to {+1, -1}
- * If a key is a start key of a file, then it maps to +1
- * If a key is an end key of a file, then it maps to -1
- * Algo:
- * 1) Poll on the keys in order:
- * a) Keep adding the mapped values to these keys (runningSum)
- * b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to
- * a boundary list.
- * 2) Return the boundary list.
- */
- public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
- ArrayList<byte[]> keysArray = new ArrayList<>();
- int runningValue = 0;
- byte[] currStartKey = null;
- boolean firstBoundary = true;
-
- for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
- if (runningValue == 0) {
- currStartKey = item.getKey();
- }
- runningValue += item.getValue();
- if (runningValue == 0) {
- if (!firstBoundary) {
- keysArray.add(currStartKey);
- }
- firstBoundary = false;
- }
- }
-
- return keysArray.toArray(new byte[0][0]);
- }
-
- /*
- * If the table is created for the first time, then "completebulkload" reads the files twice.
- * More modifications necessary if we want to avoid doing it.
- */
- private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException {
- final Path hfofDir = new Path(dirPath);
- final FileSystem fs = hfofDir.getFileSystem(getConf());
-
- // Add column families
- // Build a set of keys
- final HTableDescriptor htd = new HTableDescriptor(tableName);
- final TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
- @Override
- public HColumnDescriptor bulkFamily(final byte[] familyName) {
- HColumnDescriptor hcd = new HColumnDescriptor(familyName);
- htd.addFamily(hcd);
- return hcd;
- }
- @Override
- public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
- throws IOException {
- Path hfile = hfileStatus.getPath();
- try (HFile.Reader reader =
- HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) {
- if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
- hcd.setCompressionType(reader.getFileContext().getCompression());
- LOG.info("Setting compression " + hcd.getCompressionType().name() + " for family " +
- hcd.toString());
- }
- reader.loadFileInfo();
- byte[] first = reader.getFirstRowKey();
- byte[] last = reader.getLastRowKey();
-
- LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" +
- Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
-
- // To eventually infer start key-end key boundaries
- Integer value = map.containsKey(first) ? map.get(first) : 0;
- map.put(first, value + 1);
-
- value = map.containsKey(last) ? map.get(last) : 0;
- map.put(last, value - 1);
- }
- }
- });
-
- byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
- admin.createTable(htd, keys);
-
- LOG.info("Table "+ tableName +" is available!!");
+ public LoadIncrementalHFiles(Configuration conf) {
+ super(conf);
}
public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map,
TableName tableName) throws IOException {
- initialize();
- try (Connection connection = ConnectionFactory.createConnection(getConf());
- Admin admin = connection.getAdmin()) {
-
- boolean tableExists = admin.tableExists(tableName);
- if (!tableExists) {
- if (dirPath != null && "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
- this.createTable(tableName, dirPath, admin);
- } else {
- String errorMsg = format("Table '%s' does not exist.", tableName);
- LOG.error(errorMsg);
- throw new TableNotFoundException(errorMsg);
- }
- }
- Path hfofDir = null;
- if (dirPath != null) {
- hfofDir = new Path(dirPath);
- }
-
- try (Table table = connection.getTable(tableName);
- RegionLocator locator = connection.getRegionLocator(tableName)) {
- boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
- boolean copyFiles = getConf().getBoolean(ALWAYS_COPY_FILES, false);
- if (dirPath != null) {
- doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
- } else {
- doBulkLoad(map, admin, table, locator, silence, copyFiles);
- }
- return retValue;
- }
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- if (args.length < 2) {
- usage();
- return -1;
- }
-
- String dirPath = args[0];
- TableName tableName = TableName.valueOf(args[1]);
- Map<LoadQueueItem, ByteBuffer> loaded = run(dirPath, null, tableName);
- if (loaded == null || !loaded.isEmpty()) return 0;
- return -1;
- }
-
- public static void main(String[] args) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args);
- System.exit(ret);
- }
-
- /**
- * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
- * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
- * property. This directory is used as a temporary directory where all files are initially
- * copied/moved from user given directory, set all the required file permissions and then from
- * their it is finally loaded into a table. This should be set only when, one would like to manage
- * the staging directory by itself. Otherwise this tool will handle this by itself.
- * @param stagingDir staging directory path
- */
- public void setBulkToken(String stagingDir) {
- this.bulkToken = stagingDir;
+ Map<org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> originRet;
+ if (dirPath != null) {
+ originRet = run(dirPath, tableName);
+ } else {
+ originRet = run(map, tableName);
+ }
+ Map<LoadQueueItem, ByteBuffer> ret = new HashMap<>();
+ originRet.forEach((k, v) -> {
+ ret.put(new LoadQueueItem(k.getFamily(), k.getFilePath()), v);
+ });
+ return ret;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 2308ddf..741f200 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
@@ -82,6 +81,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index b3556c6..9cc33d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
[2/4] hbase git commit: HBASE-18699 Copy LoadIncrementalHFiles to
another package and mark the old one as deprecated
Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
new file mode 100644
index 0000000..1f27d04
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -0,0 +1,1251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import static java.lang.String.format;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.HalfStoreFileReader;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.HashMultimap;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimaps;
+import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Tool to load the output of HFileOutputFormat into an existing table.
+ */
+@InterfaceAudience.Public
+public class LoadIncrementalHFiles extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+
+ public static final String NAME = "completebulkload";
+ static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
+ public static final String MAX_FILES_PER_REGION_PER_FAMILY =
+ "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
+ private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+ public final static String CREATE_TABLE_CONF_KEY = "create.table";
+ public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
+ public final static String ALWAYS_COPY_FILES = "always.copy.files";
+
+ // We use a '.' prefix which is ignored when walking directory trees
+ // above. It is invalid family name.
+ static final String TMP_DIR = ".tmp";
+
+ private final int maxFilesPerRegionPerFamily;
+ private final boolean assignSeqIds;
+
+ // Source delegation token
+ private final FsDelegationToken fsDelegationToken;
+ private final UserProvider userProvider;
+ private final int nrThreads;
+ private final RpcControllerFactory rpcControllerFactory;
+
+ private String bulkToken;
+
+ /**
+ * Represents an HFile waiting to be loaded. An queue is used in this class in order to support
+ * the case where a region has split during the process of the load. When this happens, the HFile
+ * is split into two physical parts across the new region boundary, and each part is added back
+ * into the queue. The import process finishes when the queue is empty.
+ */
+ @InterfaceAudience.Public
+ public static class LoadQueueItem {
+ private final byte[] family;
+ private final Path hfilePath;
+
+ public LoadQueueItem(byte[] family, Path hfilePath) {
+ this.family = family;
+ this.hfilePath = hfilePath;
+ }
+
+ @Override
+ public String toString() {
+ return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString();
+ }
+
+ public byte[] getFamily() {
+ return family;
+ }
+
+ public Path getFilePath() {
+ return hfilePath;
+ }
+ }
+
+ public LoadIncrementalHFiles(Configuration conf) {
+ // make a copy, just to be sure we're not overriding someone else's config
+ super(HBaseConfiguration.create(conf));
+ conf = getConf();
+ // disable blockcache for tool invocation, see HBASE-10500
+ conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
+ userProvider = UserProvider.instantiate(conf);
+ fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+ assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
+ maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
+ nrThreads = conf.getInt("hbase.loadincremental.threads.max",
+ Runtime.getRuntime().availableProcessors());
+ rpcControllerFactory = new RpcControllerFactory(conf);
+ }
+
+ private void usage() {
+ System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" +
+ CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
+ " Note: if you set this to 'no', then the target table must already exist in HBase\n -D" +
+ IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" +
+ "\n");
+ }
+
+ /**
+ * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+ * passed directory and validates whether the prepared queue has all the valid table column
+ * families in it.
+ * @param hfilesDir directory containing list of hfiles to be loaded into the table
+ * @param table table to which hfiles should be loaded
+ * @param queue queue which needs to be loaded into the table
+ * @param validateHFile if true hfiles will be validated for its format
+ * @throws IOException If any I/O or network error occurred
+ */
+ public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
+ boolean validateHFile) throws IOException {
+ prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
+ }
+
+ /**
+ * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+ * passed directory and validates whether the prepared queue has all the valid table column
+ * families in it.
+ * @param hfilesDir directory containing list of hfiles to be loaded into the table
+ * @param table table to which hfiles should be loaded
+ * @param queue queue which needs to be loaded into the table
+ * @param validateHFile if true hfiles will be validated for its format
+ * @param silence true to ignore unmatched column families
+ * @throws IOException If any I/O or network error occurred
+ */
+ public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
+ boolean validateHFile, boolean silence) throws IOException {
+ discoverLoadQueue(queue, hfilesDir, validateHFile);
+ validateFamiliesInHFiles(table, queue, silence);
+ }
+
+ /**
+ * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+ * passed directory and validates whether the prepared queue has all the valid table column
+ * families in it.
+ * @param map map of family to List of hfiles
+ * @param table table to which hfiles should be loaded
+ * @param queue queue which needs to be loaded into the table
+ * @param silence true to ignore unmatched column families
+ * @throws IOException If any I/O or network error occurred
+ */
+ public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
+ Deque<LoadQueueItem> queue, boolean silence) throws IOException {
+ populateLoadQueue(queue, map);
+ validateFamiliesInHFiles(table, queue, silence);
+ }
+
+ /**
+ * Perform a bulk load of the given directory into the given pre-existing table. This method is
+ * not threadsafe.
+ * @param hfofDir the directory that was provided as the output path of a job using
+ * HFileOutputFormat
+ * @param admin the Admin
+ * @param table the table to load into
+ * @param regionLocator region locator
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table,
+ RegionLocator regionLocator) throws TableNotFoundException, IOException {
+ return doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
+ }
+
+ /**
+ * Perform a bulk load of the given directory into the given pre-existing table. This method is
+ * not threadsafe.
+ * @param map map of family to List of hfiles
+ * @param admin the Admin
+ * @param table the table to load into
+ * @param regionLocator region locator
+ * @param silence true to ignore unmatched column families
+ * @param copyFile always copy hfiles if true
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
+ Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
+ throws TableNotFoundException, IOException {
+ if (!admin.isTableAvailable(regionLocator.getName())) {
+ throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
+ }
+ // LQI queue does not need to be threadsafe -- all operations on this queue
+ // happen in this thread
+ Deque<LoadQueueItem> queue = new ArrayDeque<>();
+ ExecutorService pool = null;
+ SecureBulkLoadClient secureClient = null;
+ try {
+ prepareHFileQueue(map, table, queue, silence);
+ if (queue.isEmpty()) {
+ LOG.warn("Bulk load operation did not get any files to load");
+ return Collections.emptyMap();
+ }
+ pool = createExecutorService();
+ secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
+ return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
+ } finally {
+ cleanup(admin, queue, pool, secureClient);
+ }
+ }
+
+ /**
+ * Perform a bulk load of the given directory into the given pre-existing table. This method is
+ * not threadsafe.
+ * @param hfofDir the directory that was provided as the output path of a job using
+ * HFileOutputFormat
+ * @param admin the Admin
+ * @param table the table to load into
+ * @param regionLocator region locator
+ * @param silence true to ignore unmatched column families
+ * @param copyFile always copy hfiles if true
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table,
+ RegionLocator regionLocator, boolean silence, boolean copyFile)
+ throws TableNotFoundException, IOException {
+ if (!admin.isTableAvailable(regionLocator.getName())) {
+ throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
+ }
+
+ /*
+ * Checking hfile format is a time-consuming operation, we should have an option to skip this
+ * step when bulkloading millions of HFiles. See HBASE-13985.
+ */
+ boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
+ if (!validateHFile) {
+ LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
+ "are not correct. If you fail to read data from your table after using this " +
+ "option, consider removing the files and bulkload again without this option. " +
+ "See HBASE-13985");
+ }
+ // LQI queue does not need to be threadsafe -- all operations on this queue
+ // happen in this thread
+ Deque<LoadQueueItem> queue = new ArrayDeque<>();
+ ExecutorService pool = null;
+ SecureBulkLoadClient secureClient = null;
+ try {
+ prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
+
+ if (queue.isEmpty()) {
+ LOG.warn(
+ "Bulk load operation did not find any files to load in " + "directory " + hfofDir != null
+ ? hfofDir.toUri()
+ : "" + ". Does it contain files in " +
+ "subdirectories that correspond to column family names?");
+ return Collections.emptyMap();
+ }
+ pool = createExecutorService();
+ secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
+ return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
+ } finally {
+ cleanup(admin, queue, pool, secureClient);
+ }
+ }
+
+ /**
+ * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+ * <ol>
+ * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
+ * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
+ * </li>
+ * </ol>
+ * @param table Table to which these hfiles should be loaded to
+ * @param conn Connection to use
+ * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
+ * @param startEndKeys starting and ending row keys of the region
+ */
+ public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue,
+ Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ loadHFileQueue(table, conn, queue, startEndKeys, false);
+ }
+
+ /**
+ * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+ * <ol>
+ * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
+ * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
+ * </li>
+ * </ol>
+ * @param table Table to which these hfiles should be loaded to
+ * @param conn Connection to use
+ * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
+ * @param startEndKeys starting and ending row keys of the region
+ */
+ public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue,
+ Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
+ ExecutorService pool = null;
+ try {
+ pool = createExecutorService();
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups =
+ groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
+ bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
+ } finally {
+ if (pool != null) {
+ pool.shutdown();
+ }
+ }
+ }
+
+ private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table,
+ RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool,
+ SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
+ int count = 0;
+
+ if (isSecureBulkLoadEndpointAvailable()) {
+ LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
+ LOG.warn("Secure bulk load has been integrated into HBase core.");
+ }
+
+ fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
+ bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
+ Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
+
+ Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
+ // Assumes that region splits can happen while this occurs.
+ while (!queue.isEmpty()) {
+ // need to reload split keys each iteration.
+ final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
+ if (count != 0) {
+ LOG.info("Split occurred while grouping HFiles, retry attempt " + +count + " with " +
+ queue.size() + " files remaining to group or split");
+ }
+
+ int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
+ maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
+ if (maxRetries != 0 && count >= maxRetries) {
+ throw new IOException(
+ "Retry attempted " + count + " times without completing, bailing out");
+ }
+ count++;
+
+ // Using ByteBuffer for byte[] equality semantics
+ pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
+
+ if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
+ // Error is logged inside checkHFilesCountPerRegionPerFamily.
+ throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily +
+ " hfiles to one family of one region");
+ }
+
+ bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile,
+ item2RegionMap);
+
+ // NOTE: The next iteration's split / group could happen in parallel to
+ // atomic bulkloads assuming that there are splits and no merges, and
+ // that we can atomically pull out the groups we want to retry.
+ }
+
+ if (!queue.isEmpty()) {
+ throw new RuntimeException("Bulk load aborted with some files not yet loaded." +
+ "Please check log for more details.");
+ }
+ return item2RegionMap;
+ }
+
+ /**
+ * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
+ * re-queued for another pass with the groupOrSplitPhase.
+ * <p>
+ * protected for testing.
+ */
+ @VisibleForTesting
+ protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool,
+ Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+ // atomically bulk load the groups.
+ Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
+ for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap()
+ .entrySet()) {
+ byte[] first = e.getKey().array();
+ Collection<LoadQueueItem> lqis = e.getValue();
+
+ ClientServiceCallable<byte[]> serviceCallable =
+ buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile);
+
+ Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
+ @Override
+ public List<LoadQueueItem> call() throws Exception {
+ List<LoadQueueItem> toRetry =
+ tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis);
+ return toRetry;
+ }
+ };
+ if (item2RegionMap != null) {
+ for (LoadQueueItem lqi : lqis) {
+ item2RegionMap.put(lqi, e.getKey());
+ }
+ }
+ loadingFutures.add(pool.submit(call));
+ }
+
+ // get all the results.
+ for (Future<List<LoadQueueItem>> future : loadingFutures) {
+ try {
+ List<LoadQueueItem> toRetry = future.get();
+
+ if (item2RegionMap != null) {
+ for (LoadQueueItem lqi : toRetry) {
+ item2RegionMap.remove(lqi);
+ }
+ }
+ // LQIs that are requeued to be regrouped.
+ queue.addAll(toRetry);
+
+ } catch (ExecutionException e1) {
+ Throwable t = e1.getCause();
+ if (t instanceof IOException) {
+ // At this point something unrecoverable has happened.
+ // TODO Implement bulk load recovery
+ throw new IOException("BulkLoad encountered an unrecoverable problem", t);
+ }
+ LOG.error("Unexpected execution exception during bulk load", e1);
+ throw new IllegalStateException(t);
+ } catch (InterruptedException e1) {
+ LOG.error("Unexpected interrupted exception during bulk load", e1);
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn,
+ TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
+ List<Pair<byte[], String>> famPaths =
+ lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
+ .collect(Collectors.toList());
+ return new ClientServiceCallable<byte[]>(conn, tableName, first,
+ rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
+ @Override
+ protected byte[] rpcCall() throws Exception {
+ SecureBulkLoadClient secureClient = null;
+ boolean success = false;
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Going to connect to server " + getLocation() + " for row " +
+ Bytes.toStringBinary(getRow()) + " with hfile group " +
+ LoadIncrementalHFiles.this.toString(famPaths));
+ }
+ byte[] regionName = getLocation().getRegionInfo().getRegionName();
+ try (Table table = conn.getTable(getTableName())) {
+ secureClient = new SecureBulkLoadClient(getConf(), table);
+ success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+ assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
+ }
+ return success ? regionName : null;
+ } finally {
+ // Best effort copying of files that might not have been imported
+ // from the staging directory back to original location
+ // in user directory
+ if (secureClient != null && !success) {
+ FileSystem targetFs = FileSystem.get(getConf());
+ FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf());
+ // Check to see if the source and target filesystems are the same
+ // If they are the same filesystem, we will try move the files back
+ // because previously we moved them to the staging directory.
+ if (FSHDFSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) {
+ for (Pair<byte[], String> el : famPaths) {
+ Path hfileStagingPath = null;
+ Path hfileOrigPath = new Path(el.getSecond());
+ try {
+ hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
+ hfileOrigPath.getName());
+ if (targetFs.rename(hfileStagingPath, hfileOrigPath)) {
+ LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath);
+ } else if (targetFs.exists(hfileStagingPath)) {
+ LOG.debug(
+ "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath);
+ }
+ } catch (Exception ex) {
+ LOG.debug(
+ "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex);
+ }
+ }
+ }
+ }
+ }
+ }
+ };
+ }
+
+ private boolean checkHFilesCountPerRegionPerFamily(
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
+ for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
+ Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (LoadQueueItem lqi : e.getValue()) {
+ MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt());
+ count.increment();
+ if (count.intValue() > maxFilesPerRegionPerFamily) {
+ LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily +
+ " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) +
+ " of region with start key " + Bytes.toStringBinary(e.getKey()));
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @param table the table to load into
+ * @param pool the ExecutorService
+ * @param queue the queue for LoadQueueItem
+ * @param startEndKeys start and end keys
+ * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
+ */
+ private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
+ final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
+ final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ // <region start key, LQI> need synchronized only within this scope of this
+ // phase because of the puts that happen in futures.
+ Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
+ Set<String> missingHFiles = new HashSet<>();
+ Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
+ new Pair<>(regionGroups, missingHFiles);
+
+ // drain LQIs and figure out bulk load groups
+ Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
+ while (!queue.isEmpty()) {
+ final LoadQueueItem item = queue.remove();
+
+ final Callable<Pair<List<LoadQueueItem>, String>> call =
+ new Callable<Pair<List<LoadQueueItem>, String>>() {
+ @Override
+ public Pair<List<LoadQueueItem>, String> call() throws Exception {
+ Pair<List<LoadQueueItem>, String> splits =
+ groupOrSplit(regionGroups, item, table, startEndKeys);
+ return splits;
+ }
+ };
+ splittingFutures.add(pool.submit(call));
+ }
+ // get all the results. All grouping and splitting must finish before
+ // we can attempt the atomic loads.
+ for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
+ try {
+ Pair<List<LoadQueueItem>, String> splits = lqis.get();
+ if (splits != null) {
+ if (splits.getFirst() != null) {
+ queue.addAll(splits.getFirst());
+ } else {
+ missingHFiles.add(splits.getSecond());
+ }
+ }
+ } catch (ExecutionException e1) {
+ Throwable t = e1.getCause();
+ if (t instanceof IOException) {
+ LOG.error("IOException during splitting", e1);
+ throw (IOException) t; // would have been thrown if not parallelized,
+ }
+ LOG.error("Unexpected execution exception during splitting", e1);
+ throw new IllegalStateException(t);
+ } catch (InterruptedException e1) {
+ LOG.error("Unexpected interrupted exception during splitting", e1);
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
+ }
+ }
+ return pair;
+ }
+
+ private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table,
+ byte[] startKey, byte[] splitKey) throws IOException {
+ Path hfilePath = item.getFilePath();
+ byte[] family = item.getFamily();
+ Path tmpDir = hfilePath.getParent();
+ if (!tmpDir.getName().equals(TMP_DIR)) {
+ tmpDir = new Path(tmpDir, TMP_DIR);
+ }
+
+ LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting...");
+
+ String uniqueName = getUniqueName();
+ ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family);
+
+ Path botOut = new Path(tmpDir, uniqueName + ".bottom");
+ Path topOut = new Path(tmpDir, uniqueName + ".top");
+ splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
+
+ FileSystem fs = tmpDir.getFileSystem(getConf());
+ fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
+ fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
+ fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
+
+ // Add these back at the *front* of the queue, so there's a lower
+ // chance that the region will just split again before we get there.
+ List<LoadQueueItem> lqis = new ArrayList<>(2);
+ lqis.add(new LoadQueueItem(family, botOut));
+ lqis.add(new LoadQueueItem(family, topOut));
+
+ // If the current item is already the result of previous splits,
+ // we don't need it anymore. Clean up to save space.
+ // It is not part of the original input files.
+ try {
+ if (tmpDir.getName().equals(TMP_DIR)) {
+ fs.delete(hfilePath, false);
+ }
+ } catch (IOException e) {
+ LOG.warn("Unable to delete temporary split file " + hfilePath);
+ }
+ LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+ return lqis;
+ }
+
+ /**
+ * Attempt to assign the given load queue item into its target region group. If the hfile boundary
+ * no longer fits into a region, physically splits the hfile such that the new bottom half will
+ * fit and returns the list of LQI's corresponding to the resultant hfiles.
+ * <p>
+ * protected for testing
+ * @throws IOException if an IO failure is encountered
+ */
+ @VisibleForTesting
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
+ final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ Path hfilePath = item.getFilePath();
+ byte[] first, last;
+ try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
+ new CacheConfig(getConf()), true, getConf())) {
+ hfr.loadFileInfo();
+ first = hfr.getFirstRowKey();
+ last = hfr.getLastRowKey();
+ } catch (FileNotFoundException fnfe) {
+ LOG.debug("encountered", fnfe);
+ return new Pair<>(null, hfilePath.getName());
+ }
+
+ LOG.info("Trying to load hfile=" + hfilePath + " first=" + Bytes.toStringBinary(first) +
+ " last=" + Bytes.toStringBinary(last));
+ if (first == null || last == null) {
+ assert first == null && last == null;
+ // TODO what if this is due to a bad HFile?
+ LOG.info("hfile " + hfilePath + " has no entries, skipping");
+ return null;
+ }
+ if (Bytes.compareTo(first, last) > 0) {
+ throw new IllegalArgumentException(
+ "Invalid range: " + Bytes.toStringBinary(first) + " > " + Bytes.toStringBinary(last));
+ }
+ int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR);
+ if (idx < 0) {
+ // not on boundary, returns -(insertion index). Calculate region it
+ // would be in.
+ idx = -(idx + 1) - 1;
+ }
+ int indexForCallable = idx;
+
+ /**
+ * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
+ * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
+ * region. 3) if the endkey of the last region is not empty.
+ */
+ if (indexForCallable < 0) {
+ throw new IOException("The first region info for table " + table.getName() +
+ " can't be found in hbase:meta.Please use hbck tool to fix it first.");
+ } else if ((indexForCallable == startEndKeys.getFirst().length - 1) &&
+ !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
+ throw new IOException("The last region info for table " + table.getName() +
+ " can't be found in hbase:meta.Please use hbck tool to fix it first.");
+ } else if (indexForCallable + 1 < startEndKeys.getFirst().length &&
+ !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
+ startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
+ throw new IOException("The endkey of one region for table " + table.getName() +
+ " is not equal to the startkey of the next region in hbase:meta." +
+ "Please use hbck tool to fix it first.");
+ }
+
+ boolean lastKeyInRange = Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
+ Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
+ if (!lastKeyInRange) {
+ List<LoadQueueItem> lqis = splitStoreFile(item, table,
+ startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]);
+ return new Pair<>(lqis, null);
+ }
+
+ // group regions.
+ regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
+ return null;
+ }
+
+ /**
+ * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
+ * hfiles that need to be retried. If it is successful it will return an empty list.
+ * <p>
+ * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically
+ * and fails atomically.
+ * <p>
+ * Protected for testing.
+ * @return empty list if success, list of items to retry on recoverable failure
+ */
+ @VisibleForTesting
+ protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
+ final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
+ throws IOException {
+ try {
+ List<LoadQueueItem> toRetry = new ArrayList<>();
+ Configuration conf = getConf();
+ byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
+ .callWithRetries(serviceCallable, Integer.MAX_VALUE);
+ if (region == null) {
+ LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) +
+ " into table " + tableName + " with files " + lqis +
+ " failed. This is recoverable and they will be retried.");
+ toRetry.addAll(lqis); // return lqi's to retry
+ }
+ // success
+ return toRetry;
+ } catch (IOException e) {
+ LOG.error("Encountered unrecoverable error from region server, additional details: " +
+ serviceCallable.getExceptionMessageAdditionalDetail(),
+ e);
+ throw e;
+ }
+ }
+
+ /**
+ * If the table is created for the first time, then "completebulkload" reads the files twice. More
+ * modifications necessary if we want to avoid doing it.
+ */
+ private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException {
+ final Path hfofDir = new Path(dirPath);
+ final FileSystem fs = hfofDir.getFileSystem(getConf());
+
+ // Add column families
+ // Build a set of keys
+ List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>();
+ SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() {
+ @Override
+ public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) {
+ ColumnFamilyDescriptorBuilder builder =
+ ColumnFamilyDescriptorBuilder.newBuilder(familyName);
+ familyBuilders.add(builder);
+ return builder;
+ }
+
+ @Override
+ public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus)
+ throws IOException {
+ Path hfile = hfileStatus.getPath();
+ try (HFile.Reader reader =
+ HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) {
+ if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
+ builder.setCompressionType(reader.getFileContext().getCompression());
+ LOG.info("Setting compression " + reader.getFileContext().getCompression().name() +
+ " for family " + builder.getNameAsString());
+ }
+ reader.loadFileInfo();
+ byte[] first = reader.getFirstRowKey();
+ byte[] last = reader.getLastRowKey();
+
+ LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" +
+ Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
+
+ // To eventually infer start key-end key boundaries
+ Integer value = map.containsKey(first) ? map.get(first) : 0;
+ map.put(first, value + 1);
+
+ value = map.containsKey(last) ? map.get(last) : 0;
+ map.put(last, value - 1);
+ }
+ }
+ });
+
+ byte[][] keys = inferBoundaries(map);
+ TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
+ familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build)
+ .forEachOrdered(tdBuilder::addColumnFamily);
+ admin.createTable(tdBuilder.build(), keys);
+
+ LOG.info("Table " + tableName + " is available!!");
+ }
+
+ private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
+ SecureBulkLoadClient secureClient) throws IOException {
+ fsDelegationToken.releaseDelegationToken();
+ if (bulkToken != null && secureClient != null) {
+ secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
+ }
+ if (pool != null) {
+ pool.shutdown();
+ }
+ if (!queue.isEmpty()) {
+ StringBuilder err = new StringBuilder();
+ err.append("-------------------------------------------------\n");
+ err.append("Bulk load aborted with some files not yet loaded:\n");
+ err.append("-------------------------------------------------\n");
+ for (LoadQueueItem q : queue) {
+ err.append(" ").append(q.getFilePath()).append('\n');
+ }
+ LOG.error(err);
+ }
+ }
+
+ // unique file name for the table
+ private String getUniqueName() {
+ return UUID.randomUUID().toString().replaceAll("-", "");
+ }
+
+ /**
+ * Checks whether there is any invalid family name in HFiles to be bulk loaded.
+ */
+ private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
+ throws IOException {
+ Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream()
+ .map(f -> f.getNameAsString()).collect(Collectors.toSet());
+ List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily()))
+ .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList());
+ if (unmatchedFamilies.size() > 0) {
+ String msg =
+ "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " +
+ unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " +
+ familyNames;
+ LOG.error(msg);
+ if (!silence) {
+ throw new IOException(msg);
+ }
+ }
+ }
+
+ /**
+ * Populate the Queue with given HFiles
+ */
+ private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) {
+ map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add));
+ }
+
+ /**
+ * Walk the given directory for all HFiles, and return a Queue containing all such files.
+ */
+ private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
+ final boolean validateHFile) throws IOException {
+ visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() {
+ @Override
+ public byte[] bulkFamily(final byte[] familyName) {
+ return familyName;
+ }
+
+ @Override
+ public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
+ long length = hfile.getLen();
+ if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
+ HConstants.DEFAULT_MAX_FILE_SIZE)) {
+ LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length +
+ " bytes can be problematic as it may lead to oversplitting.");
+ }
+ ret.add(new LoadQueueItem(family, hfile.getPath()));
+ }
+ }, validateHFile);
+ }
+
+ private interface BulkHFileVisitor<TFamily> {
+
+ TFamily bulkFamily(byte[] familyName) throws IOException;
+
+ void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException;
+ }
+
+ /**
+ * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and
+ * non-valid hfiles.
+ */
+ private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
+ final BulkHFileVisitor<TFamily> visitor) throws IOException {
+ visitBulkHFiles(fs, bulkDir, visitor, true);
+ }
+
+ /**
+ * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
+ * skip non-valid hfiles by default, or skip this validation by setting
+ * 'hbase.loadincremental.validate.hfile' to false.
+ */
+ private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir,
+ BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException {
+ FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
+ for (FileStatus familyStat : familyDirStatuses) {
+ if (!familyStat.isDirectory()) {
+ LOG.warn("Skipping non-directory " + familyStat.getPath());
+ continue;
+ }
+ Path familyDir = familyStat.getPath();
+ byte[] familyName = familyDir.getName().getBytes();
+ // Skip invalid family
+ try {
+ ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Skipping invalid " + familyStat.getPath());
+ continue;
+ }
+ TFamily family = visitor.bulkFamily(familyName);
+
+ FileStatus[] hfileStatuses = fs.listStatus(familyDir);
+ for (FileStatus hfileStatus : hfileStatuses) {
+ if (!fs.isFile(hfileStatus.getPath())) {
+ LOG.warn("Skipping non-file " + hfileStatus);
+ continue;
+ }
+
+ Path hfile = hfileStatus.getPath();
+ // Skip "_", reference, HFileLink
+ String fileName = hfile.getName();
+ if (fileName.startsWith("_")) {
+ continue;
+ }
+ if (StoreFileInfo.isReference(fileName)) {
+ LOG.warn("Skipping reference " + fileName);
+ continue;
+ }
+ if (HFileLink.isHFileLink(fileName)) {
+ LOG.warn("Skipping HFileLink " + fileName);
+ continue;
+ }
+
+ // Validate HFile Format if needed
+ if (validateHFile) {
+ try {
+ if (!HFile.isHFileFormat(fs, hfile)) {
+ LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
+ continue;
+ }
+ } catch (FileNotFoundException e) {
+ LOG.warn("the file " + hfile + " was removed");
+ continue;
+ }
+ }
+
+ visitor.bulkHFile(family, hfileStatus);
+ }
+ }
+ }
+
+ // Initialize a thread pool
+ private ExecutorService createExecutorService() {
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build());
+ pool.allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ private final String toString(List<Pair<byte[], String>> list) {
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ list.forEach(p -> {
+ sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond())
+ .append('}');
+ });
+ sb.append(']');
+ return sb.toString();
+ }
+
+ private boolean isSecureBulkLoadEndpointAvailable() {
+ String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+ return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+ }
+
+ /**
+ * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
+ * filters, etc.
+ */
+ @VisibleForTesting
+ static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
+ byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
+ // Open reader with no block cache, and not in-memory
+ Reference topReference = Reference.createTopReference(splitKey);
+ Reference bottomReference = Reference.createBottomReference(splitKey);
+
+ copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
+ copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
+ }
+
+ /**
+ * Copy half of an HFile into a new HFile.
+ */
+ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
+ Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
+ FileSystem fs = inFile.getFileSystem(conf);
+ CacheConfig cacheConf = new CacheConfig(conf);
+ HalfStoreFileReader halfReader = null;
+ StoreFileWriter halfWriter = null;
+ try {
+ halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
+ new AtomicInteger(0), true, conf);
+ Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
+
+ int blocksize = familyDescriptor.getBlocksize();
+ Algorithm compression = familyDescriptor.getCompressionType();
+ BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
+ HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+ .withChecksumType(HStore.getChecksumType(conf))
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize)
+ .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
+ .build();
+ halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
+ .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+ HFileScanner scanner = halfReader.getScanner(false, false, false);
+ scanner.seekTo();
+ do {
+ halfWriter.append(scanner.getCell());
+ } while (scanner.next());
+
+ for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
+ if (shouldCopyHFileMetaKey(entry.getKey())) {
+ halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
+ }
+ }
+ } finally {
+ if (halfReader != null) {
+ try {
+ halfReader.close(cacheConf.shouldEvictOnClose());
+ } catch (IOException e) {
+ LOG.warn("failed to close hfile reader for " + inFile, e);
+ }
+ }
+ if (halfWriter != null) {
+ halfWriter.close();
+ }
+
+ }
+ }
+
+ private static boolean shouldCopyHFileMetaKey(byte[] key) {
+ // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
+ if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
+ return false;
+ }
+
+ return !HFile.isReservedFileInfoKey(key);
+ }
+
+ private boolean isCreateTable() {
+ return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"));
+ }
+
+ private boolean isSilence() {
+ return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
+ }
+
+ private boolean isAlwaysCopyFiles() {
+ return getConf().getBoolean(ALWAYS_COPY_FILES, false);
+ }
+
+ /**
+ * Perform bulk load on the given table.
+ * @param hfofDir the directory that was provided as the output path of a job using
+ * HFileOutputFormat
+ * @param tableName the table to load into
+ */
+ public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
+ throws IOException {
+ try (Connection connection = ConnectionFactory.createConnection(getConf());
+ Admin admin = connection.getAdmin()) {
+ if (!admin.tableExists(tableName)) {
+ if (isCreateTable()) {
+ createTable(tableName, hfofDir, admin);
+ } else {
+ String errorMsg = format("Table '%s' does not exist.", tableName);
+ LOG.error(errorMsg);
+ throw new TableNotFoundException(errorMsg);
+ }
+ }
+ try (Table table = connection.getTable(tableName);
+ RegionLocator locator = connection.getRegionLocator(tableName)) {
+ return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), isAlwaysCopyFiles());
+ }
+ }
+ }
+
+ /**
+ * Perform bulk load on the given table.
+ * @param family2Files map of family to List of hfiles
+ * @param tableName the table to load into
+ */
+ public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files,
+ TableName tableName) throws IOException {
+ try (Connection connection = ConnectionFactory.createConnection(getConf());
+ Admin admin = connection.getAdmin()) {
+ if (!admin.tableExists(tableName)) {
+ String errorMsg = format("Table '%s' does not exist.", tableName);
+ LOG.error(errorMsg);
+ throw new TableNotFoundException(errorMsg);
+ }
+ try (Table table = connection.getTable(tableName);
+ RegionLocator locator = connection.getRegionLocator(tableName)) {
+ return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles());
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ usage();
+ return -1;
+ }
+ String dirPath = args[0];
+ TableName tableName = TableName.valueOf(args[1]);
+ return !run(dirPath, tableName).isEmpty() ? 0 : -1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args);
+ System.exit(ret);
+ }
+
+ /**
+ * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
+ * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
+ * property. This directory is used as a temporary directory where all files are initially
+ * copied/moved from user given directory, set all the required file permissions and then from
+ * their it is finally loaded into a table. This should be set only when, one would like to manage
+ * the staging directory by itself. Otherwise this tool will handle this by itself.
+ * @param stagingDir staging directory path
+ */
+ public void setBulkToken(String stagingDir) {
+ this.bulkToken = stagingDir;
+ }
+
+ /**
+ * Infers region boundaries for a new table.
+ * <p>
+ * Parameter: <br>
+ * bdryMap is a map between keys to an integer belonging to {+1, -1}
+ * <ul>
+ * <li>If a key is a start key of a file, then it maps to +1</li>
+ * <li>If a key is an end key of a file, then it maps to -1</li>
+ * </ul>
+ * <p>
+ * Algo:<br>
+ * <ol>
+ * <li>Poll on the keys in order:
+ * <ol type="a">
+ * <li>Keep adding the mapped values to these keys (runningSum)</li>
+ * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a
+ * boundary list.</li>
+ * </ol>
+ * </li>
+ * <li>Return the boundary list.</li>
+ * </ol>
+ */
+ public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) {
+ List<byte[]> keysArray = new ArrayList<>();
+ int runningValue = 0;
+ byte[] currStartKey = null;
+ boolean firstBoundary = true;
+
+ for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) {
+ if (runningValue == 0) {
+ currStartKey = item.getKey();
+ }
+ runningValue += item.getValue();
+ if (runningValue == 0) {
+ if (!firstBoundary) {
+ keysArray.add(currStartKey);
+ }
+ firstBoundary = false;
+ }
+ }
+
+ return keysArray.toArray(new byte[0][]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index bc663e1..886d0da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -3484,7 +3484,7 @@ public class HBaseFsck extends Configured implements Closeable {
errors.print("This sidelined region dir should be bulk loaded: "
+ path.toString());
errors.print("Bulk load command looks like: "
- + "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles "
+ + "hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles "
+ path.toUri().getPath() + " "+ tableName);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index c4924bb..3b7f1f8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
@@ -72,9 +71,9 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -86,6 +85,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+
@Category({ CoprocessorTests.class, MediumTests.class })
public class TestRegionObserverInterface {
private static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
deleted file mode 100644
index b5b7a0c..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ /dev/null
@@ -1,763 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HFileTestUtil;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-/**
- * Test cases for the "load" half of the HFileOutputFormat bulk load
- * functionality. These tests run faster than the full MR cluster
- * tests in TestHFileOutputFormat
- */
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestLoadIncrementalHFiles {
- @Rule
- public TestName tn = new TestName();
-
- private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
- private static final byte[] FAMILY = Bytes.toBytes("myfam");
- private static final String NAMESPACE = "bulkNS";
-
- static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
- static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
-
- private static final byte[][] SPLIT_KEYS = new byte[][] {
- Bytes.toBytes("ddd"),
- Bytes.toBytes("ppp")
- };
-
- static HBaseTestingUtility util = new HBaseTestingUtility();
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
- util.getConfiguration().setInt(
- LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
- MAX_FILES_PER_REGION_PER_FAMILY);
- // change default behavior so that tag values are returned with normal rpcs
- util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
- KeyValueCodecWithTags.class.getCanonicalName());
- util.startMiniCluster();
-
- setupNamespace();
- }
-
- protected static void setupNamespace() throws Exception {
- util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- util.shutdownMiniCluster();
- }
-
- @Test(timeout = 120000)
- public void testSimpleLoadWithMap() throws Exception {
- runTest("testSimpleLoadWithMap", BloomType.NONE,
- new byte[][][] {
- new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
- new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
- }, true);
- }
-
- /**
- * Test case that creates some regions and loads
- * HFiles that fit snugly inside those regions
- */
- @Test(timeout = 120000)
- public void testSimpleLoad() throws Exception {
- runTest("testSimpleLoad", BloomType.NONE,
- new byte[][][] {
- new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
- new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
- });
- }
-
- @Test(timeout = 120000)
- public void testSimpleLoadWithFileCopy() throws Exception {
- String testName = tn.getMethodName();
- final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
- runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
- false, null, new byte[][][] {
- new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
- new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
- }, false, true);
- }
-
- /**
- * Test case that creates some regions and loads
- * HFiles that cross the boundaries of those regions
- */
- @Test(timeout = 120000)
- public void testRegionCrossingLoad() throws Exception {
- runTest("testRegionCrossingLoad", BloomType.NONE,
- new byte[][][] {
- new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
- new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
- });
- }
-
- /**
- * Test loading into a column family that has a ROW bloom filter.
- */
- @Test(timeout = 60000)
- public void testRegionCrossingRowBloom() throws Exception {
- runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
- new byte[][][] {
- new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
- new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
- });
- }
-
- /**
- * Test loading into a column family that has a ROWCOL bloom filter.
- */
- @Test(timeout = 120000)
- public void testRegionCrossingRowColBloom() throws Exception {
- runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
- new byte[][][] {
- new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
- new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
- });
- }
-
- /**
- * Test case that creates some regions and loads HFiles that have
- * different region boundaries than the table pre-split.
- */
- @Test(timeout = 120000)
- public void testSimpleHFileSplit() throws Exception {
- runTest("testHFileSplit", BloomType.NONE,
- new byte[][] {
- Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
- Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
- },
- new byte[][][] {
- new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
- new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") },
- }
- );
- }
-
- /**
- * Test case that creates some regions and loads HFiles that cross the boundaries
- * and have different region boundaries than the table pre-split.
- */
- @Test(timeout = 60000)
- public void testRegionCrossingHFileSplit() throws Exception {
- testRegionCrossingHFileSplit(BloomType.NONE);
- }
-
- /**
- * Test case that creates some regions and loads HFiles that cross the boundaries
- * have a ROW bloom filter and a different region boundaries than the table pre-split.
- */
- @Test(timeout = 120000)
- public void testRegionCrossingHFileSplitRowBloom() throws Exception {
- testRegionCrossingHFileSplit(BloomType.ROW);
- }
-
- /**
- * Test case that creates some regions and loads HFiles that cross the boundaries
- * have a ROWCOL bloom filter and a different region boundaries than the table pre-split.
- */
- @Test(timeout = 120000)
- public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
- testRegionCrossingHFileSplit(BloomType.ROWCOL);
- }
-
- @Test
- public void testSplitALot() throws Exception {
- runTest("testSplitALot", BloomType.NONE,
- new byte[][] {
- Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"),
- Bytes.toBytes("ccc"), Bytes.toBytes("ddd"),
- Bytes.toBytes("eee"), Bytes.toBytes("fff"),
- Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
- Bytes.toBytes("iii"), Bytes.toBytes("lll"),
- Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
- Bytes.toBytes("ooo"), Bytes.toBytes("ppp"),
- Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
- Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
- Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
- Bytes.toBytes("zzz"),
- },
- new byte[][][] {
- new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") },
- }
- );
- }
-
- private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
- runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
- new byte[][] {
- Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
- Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
- },
- new byte[][][] {
- new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
- new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
- }
- );
- }
-
- private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
- HTableDescriptor htd = new HTableDescriptor(tableName);
- HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
- familyDesc.setBloomFilterType(bloomType);
- htd.addFamily(familyDesc);
- return htd;
- }
-
- private void runTest(String testName, BloomType bloomType,
- byte[][][] hfileRanges) throws Exception {
- runTest(testName, bloomType, null, hfileRanges);
- }
-
- private void runTest(String testName, BloomType bloomType,
- byte[][][] hfileRanges, boolean useMap) throws Exception {
- runTest(testName, bloomType, null, hfileRanges, useMap);
- }
-
- private void runTest(String testName, BloomType bloomType,
- byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
- runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
- }
-
- private void runTest(String testName, BloomType bloomType,
- byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) throws Exception {
- final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName);
- final boolean preCreateTable = tableSplitKeys != null;
-
- // Run the test bulkloading the table to the default namespace
- final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
- runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
- useMap);
-
- // Run the test bulkloading the table to the specified namespace
- final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
- runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
- useMap);
- }
-
- private void runTest(String testName, TableName tableName, BloomType bloomType,
- boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
- throws Exception {
- HTableDescriptor htd = buildHTD(tableName, bloomType);
- runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
- }
-
- public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util,
- byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
- byte[][][] hfileRanges, boolean useMap, boolean deleteFile,
- boolean copyFiles, int initRowCount, int factor) throws Exception {
- Path dir = util.getDataTestDirOnTestFS(testName);
- FileSystem fs = util.getTestFileSystem();
- dir = dir.makeQualified(fs);
- Path familyDir = new Path(dir, Bytes.toString(fam));
-
- int hfileIdx = 0;
- Map<byte[], List<Path>> map = null;
- List<Path> list = null;
- if (useMap || copyFiles) {
- list = new ArrayList<>();
- }
- if (useMap) {
- map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- map.put(fam, list);
- }
- Path last = null;
- for (byte[][] range : hfileRanges) {
- byte[] from = range[0];
- byte[] to = range[1];
- Path path = new Path(familyDir, "hfile_" + hfileIdx++);
- HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
- if (useMap) {
- last = path;
- list.add(path);
- }
- }
- int expectedRows = hfileIdx * factor;
-
- final TableName tableName = htd.getTableName();
- if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
- util.getAdmin().createTable(htd, tableSplitKeys);
- }
-
- Configuration conf = util.getConfiguration();
- if (copyFiles) {
- conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
- }
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
- String [] args= {dir.toString(), tableName.toString()};
- if (useMap) {
- if (deleteFile) fs.delete(last);
- Map<LoadQueueItem, ByteBuffer> loaded = loader.run(null, map, tableName);
- if (deleteFile) {
- expectedRows -= 1000;
- for (LoadQueueItem item : loaded.keySet()) {
- if (item.hfilePath.getName().equals(last.getName())) {
- fail(last + " should be missing");
- }
- }
- }
- } else {
- loader.run(args);
- }
-
- if (copyFiles) {
- for (Path p : list) {
- assertTrue(p + " should exist", fs.exists(p));
- }
- }
-
- Table table = util.getConnection().getTable(tableName);
- try {
- assertEquals(initRowCount + expectedRows, util.countRows(table));
- } finally {
- table.close();
- }
-
- return expectedRows;
- }
-
- private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
- boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
- boolean copyFiles) throws Exception {
- loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys,
- hfileRanges, useMap, true, copyFiles, 0, 1000);
-
- final TableName tableName = htd.getTableName();
- // verify staging folder has been cleaned up
- Path stagingBasePath = new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
- FileSystem fs = util.getTestFileSystem();
- if(fs.exists(stagingBasePath)) {
- FileStatus[] files = fs.listStatus(stagingBasePath);
- for(FileStatus file : files) {
- assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
- file.getPath().getName() != "DONOTERASE");
- }
- }
-
- util.deleteTable(tableName);
- }
-
- /**
- * Test that tags survive through a bulk load that needs to split hfiles.
- *
- * This test depends on the "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client
- * can get tags in the responses.
- */
- @Test(timeout = 60000)
- public void testTagsSurviveBulkLoadSplit() throws Exception {
- Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
- FileSystem fs = util.getTestFileSystem();
- dir = dir.makeQualified(fs);
- Path familyDir = new Path(dir, Bytes.toString(FAMILY));
- // table has these split points
- byte [][] tableSplitKeys = new byte[][] {
- Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
- Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
- };
-
- // creating an hfile that has values that span the split points.
- byte[] from = Bytes.toBytes("ddd");
- byte[] to = Bytes.toBytes("ooo");
- HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
- new Path(familyDir, tn.getMethodName()+"_hfile"),
- FAMILY, QUALIFIER, from, to, 1000);
- int expectedRows = 1000;
-
- TableName tableName = TableName.valueOf(tn.getMethodName());
- HTableDescriptor htd = buildHTD(tableName, BloomType.NONE);
- util.getAdmin().createTable(htd, tableSplitKeys);
-
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
- String [] args= {dir.toString(), tableName.toString()};
- loader.run(args);
-
- Table table = util.getConnection().getTable(tableName);
- try {
- assertEquals(expectedRows, util.countRows(table));
- HFileTestUtil.verifyTags(table);
- } finally {
- table.close();
- }
-
- util.deleteTable(tableName);
- }
-
- /**
- * Test loading into a column family that does not exist.
- */
- @Test(timeout = 60000)
- public void testNonexistentColumnFamilyLoad() throws Exception {
- String testName = tn.getMethodName();
- byte[][][] hFileRanges = new byte[][][] {
- new byte[][]{ Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
- new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
- };
-
- final byte[] TABLE = Bytes.toBytes("mytable_"+testName);
- HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
- // set real family name to upper case in purpose to simulate the case that
- // family name in HFiles is invalid
- HColumnDescriptor family =
- new HColumnDescriptor(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT)));
- htd.addFamily(family);
-
- try {
- runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
- assertTrue("Loading into table with non-existent family should have failed", false);
- } catch (Exception e) {
- assertTrue("IOException expected", e instanceof IOException);
- // further check whether the exception message is correct
- String errMsg = e.getMessage();
- assertTrue("Incorrect exception message, expected message: ["
- + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + "], current message: [" + errMsg + "]",
- errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
- }
- }
-
- @Test(timeout = 120000)
- public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
- testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
- }
-
- @Test(timeout = 120000)
- public void testNonHfileFolder() throws Exception {
- testNonHfileFolder("testNonHfileFolder", false);
- }
-
- /**
- * Write a random data file and a non-file in a dir with a valid family name
- * but not part of the table families. we should we able to bulkload without
- * getting the unmatched family exception. HBASE-13037/HBASE-13227
- */
- private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
- Path dir = util.getDataTestDirOnTestFS(tableName);
- FileSystem fs = util.getTestFileSystem();
- dir = dir.makeQualified(fs);
-
- Path familyDir = new Path(dir, Bytes.toString(FAMILY));
- HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"),
- FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
- createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
-
- final String NON_FAMILY_FOLDER = "_logs";
- Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
- fs.mkdirs(nonFamilyDir);
- fs.mkdirs(new Path(nonFamilyDir, "non-file"));
- createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
-
- Table table = null;
- try {
- if (preCreateTable) {
- table = util.createTable(TableName.valueOf(tableName), FAMILY);
- } else {
- table = util.getConnection().getTable(TableName.valueOf(tableName));
- }
-
- final String[] args = {dir.toString(), tableName};
- new LoadIncrementalHFiles(util.getConfiguration()).run(args);
- assertEquals(500, util.countRows(table));
- } finally {
- if (table != null) {
- table.close();
- }
- fs.delete(dir, true);
- }
- }
-
- private static void createRandomDataFile(FileSystem fs, Path path, int size)
- throws IOException {
- FSDataOutputStream stream = fs.create(path);
- try {
- byte[] data = new byte[1024];
- for (int i = 0; i < data.length; ++i) {
- data[i] = (byte)(i & 0xff);
- }
- while (size >= data.length) {
- stream.write(data, 0, data.length);
- size -= data.length;
- }
- if (size > 0) {
- stream.write(data, 0, size);
- }
- } finally {
- stream.close();
- }
- }
-
- @Test(timeout = 120000)
- public void testSplitStoreFile() throws IOException {
- Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
- FileSystem fs = util.getTestFileSystem();
- Path testIn = new Path(dir, "testhfile");
- HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
- HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
- Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
-
- Path bottomOut = new Path(dir, "bottom.out");
- Path topOut = new Path(dir, "top.out");
-
- LoadIncrementalHFiles.splitStoreFile(
- util.getConfiguration(), testIn,
- familyDesc, Bytes.toBytes("ggg"),
- bottomOut,
- topOut);
-
- int rowCount = verifyHFile(bottomOut);
- rowCount += verifyHFile(topOut);
- assertEquals(1000, rowCount);
- }
-
- @Test
- public void testSplitStoreFileWithNoneToNone() throws IOException {
- testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
- }
-
- @Test
- public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
- testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
- }
-
- @Test
- public void testSplitStoreFileWithEncodedToNone() throws IOException {
- testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
- }
-
- @Test
- public void testSplitStoreFileWithNoneToEncoded() throws IOException {
- testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
- }
-
- private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
- DataBlockEncoding cfEncoding) throws IOException {
- Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
- FileSystem fs = util.getTestFileSystem();
- Path testIn = new Path(dir, "testhfile");
- HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
- familyDesc.setDataBlockEncoding(cfEncoding);
- HFileTestUtil.createHFileWithDataBlockEncoding(
- util.getConfiguration(), fs, testIn, bulkloadEncoding,
- FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
-
- Path bottomOut = new Path(dir, "bottom.out");
- Path topOut = new Path(dir, "top.out");
-
- LoadIncrementalHFiles.splitStoreFile(
- util.getConfiguration(), testIn,
- familyDesc, Bytes.toBytes("ggg"),
- bottomOut,
- topOut);
-
- int rowCount = verifyHFile(bottomOut);
- rowCount += verifyHFile(topOut);
- assertEquals(1000, rowCount);
- }
-
- private int verifyHFile(Path p) throws IOException {
- Configuration conf = util.getConfiguration();
- HFile.Reader reader =
- HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
- reader.loadFileInfo();
- HFileScanner scanner = reader.getScanner(false, false);
- scanner.seekTo();
- int count = 0;
- do {
- count++;
- } while (scanner.next());
- assertTrue(count > 0);
- reader.close();
- return count;
- }
-
- private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
- Integer value = map.containsKey(first)?map.get(first):0;
- map.put(first, value+1);
-
- value = map.containsKey(last)?map.get(last):0;
- map.put(last, value-1);
- }
-
- @Test(timeout = 120000)
- public void testInferBoundaries() {
- TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
- /* Toy example
- * c---------i o------p s---------t v------x
- * a------e g-----k m-------------q r----s u----w
- *
- * Should be inferred as:
- * a-----------------k m-------------q r--------------t u---------x
- *
- * The output should be (m,r,u)
- */
-
- String first;
- String last;
-
- first = "a"; last = "e";
- addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
- first = "r"; last = "s";
- addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
- first = "o"; last = "p";
- addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
- first = "g"; last = "k";
- addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
- first = "v"; last = "x";
- addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
- first = "c"; last = "i";
- addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
- first = "m"; last = "q";
- addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
- first = "s"; last = "t";
- addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
- first = "u"; last = "w";
- addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
- byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
- byte[][] compare = new byte[3][];
- compare[0] = "m".getBytes();
- compare[1] = "r".getBytes();
- compare[2] = "u".getBytes();
-
- assertEquals(keysArray.length, 3);
-
- for (int row = 0; row<keysArray.length; row++){
- assertArrayEquals(keysArray[row], compare[row]);
- }
- }
-
- @Test(timeout = 60000)
- public void testLoadTooMayHFiles() throws Exception {
- Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
- FileSystem fs = util.getTestFileSystem();
- dir = dir.makeQualified(fs);
- Path familyDir = new Path(dir, Bytes.toString(FAMILY));
-
- byte[] from = Bytes.toBytes("begin");
- byte[] to = Bytes.toBytes("end");
- for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
- HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
- + i), FAMILY, QUALIFIER, from, to, 1000);
- }
-
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
- String [] args= {dir.toString(), "mytable_testLoadTooMayHFiles"};
- try {
- loader.run(args);
- fail("Bulk loading too many files should fail");
- } catch (IOException ie) {
- assertTrue(ie.getMessage().contains("Trying to load more than "
- + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
- }
- }
-
- @Test(expected = TableNotFoundException.class)
- public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
- Configuration conf = util.getConfiguration();
- conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
- String[] args = { "directory", "nonExistingTable" };
- loader.run(args);
- }
-
- @Test(timeout = 120000)
- public void testTableWithCFNameStartWithUnderScore() throws Exception {
- Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
- FileSystem fs = util.getTestFileSystem();
- dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
- String family = "_cf";
- Path familyDir = new Path(dir, family);
-
- byte[] from = Bytes.toBytes("begin");
- byte[] to = Bytes.toBytes("end");
- Configuration conf = util.getConfiguration();
- String tableName = tn.getMethodName();
- Table table = util.createTable(TableName.valueOf(tableName), family);
- HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
- QUALIFIER, from, to, 1000);
-
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
- String[] args = { dir.toString(), tableName };
- try {
- loader.run(args);
- assertEquals(1000, util.countRows(table));
- } finally {
- if (null != table) {
- table.close();
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
index 07dd2a9..5dce4ad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
@@ -40,9 +40,9 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
[4/4] hbase git commit: HBASE-18699 Copy LoadIncrementalHFiles to
another package and mark the old one as deprecated
Posted by zh...@apache.org.
HBASE-18699 Copy LoadIncrementalHFiles to another package and mark the old one as deprecated
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9e53f292
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9e53f292
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9e53f292
Branch: refs/heads/master
Commit: 9e53f2927b3154eb703560933ddad489c2e232b5
Parents: 7c51d3f
Author: zhangduo <zh...@apache.org>
Authored: Fri Sep 1 20:27:16 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Sep 3 19:49:42 2017 +0800
----------------------------------------------------------------------
.../hbase/backup/impl/RestoreTablesClient.java | 6 +-
.../backup/mapreduce/MapReduceRestoreJob.java | 2 +-
.../hadoop/hbase/backup/util/BackupUtils.java | 2 +-
.../hadoop/hbase/backup/util/RestoreTool.java | 2 +-
.../hadoop/hbase/backup/TestBackupBase.java | 2 +-
.../TestIncrementalBackupWithBulkLoad.java | 5 +-
.../client/ColumnFamilyDescriptorBuilder.java | 10 +-
.../hbase/coprocessor/TestSecureExport.java | 2 +-
...ReplicationSyncUpToolWithBulkLoadedData.java | 2 +-
.../mapreduce/IntegrationTestBulkLoad.java | 23 +-
.../mapreduce/IntegrationTestImportTsv.java | 5 +-
.../hadoop/hbase/mapreduce/CopyTable.java | 1 +
.../apache/hadoop/hbase/mapreduce/Driver.java | 1 +
.../hbase/mapreduce/HRegionPartitioner.java | 2 +-
...opSecurityEnabledUserProviderForTesting.java | 41 -
.../hbase/mapreduce/TestHFileOutputFormat2.java | 6 +-
.../TestLoadIncrementalHFilesSplitRecovery.java | 669 ---------
.../TestSecureLoadIncrementalHFiles.java | 70 -
...ecureLoadIncrementalHFilesSplitRecovery.java | 69 -
.../snapshot/TestMobSecureExportSnapshot.java | 2 +-
.../snapshot/TestSecureExportSnapshot.java | 2 +-
.../hbase/mapreduce/LoadIncrementalHFiles.java | 1284 +-----------------
.../compactions/PartitionedMobCompactor.java | 2 +-
.../regionserver/HFileReplicator.java | 4 +-
.../hbase/tool/LoadIncrementalHFiles.java | 1251 +++++++++++++++++
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 2 +-
.../TestRegionObserverInterface.java | 5 +-
.../mapreduce/TestLoadIncrementalHFiles.java | 763 -----------
.../regionserver/TestScannerWithBulkload.java | 2 +-
.../replication/TestMasterReplication.java | 2 +-
...opSecurityEnabledUserProviderForTesting.java | 41 +
.../security/access/TestAccessController.java | 19 +-
.../hadoop/hbase/tool/MapreduceTestingShim.java | 171 +++
.../hbase/tool/TestLoadIncrementalHFiles.java | 723 ++++++++++
.../TestLoadIncrementalHFilesSplitRecovery.java | 628 +++++++++
.../tool/TestSecureLoadIncrementalHFiles.java | 66 +
...ecureLoadIncrementalHFilesSplitRecovery.java | 66 +
.../spark/IntegrationTestSparkBulkLoad.java | 2 +-
.../hbasecontext/JavaHBaseBulkLoadExample.java | 2 +-
.../hbase/spark/TestJavaHBaseContext.java | 2 +-
.../hadoop/hbase/spark/BulkLoadSuite.scala | 2 +-
src/main/asciidoc/_chapters/ops_mgt.adoc | 2 +-
42 files changed, 3041 insertions(+), 2922 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index ea7a7b8..ff79533 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -44,8 +44,8 @@ import org.apache.hadoop.hbase.backup.util.RestoreTool;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
/**
* Restore table implementation
@@ -231,7 +231,7 @@ public class RestoreTablesClient {
LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
for (int i = 0; i < sTableList.size(); i++) {
if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
- loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]);
+ loaderResult = loader.run(mapForSrc[i], tTableArray[i]);
LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
if (loaderResult.isEmpty()) {
String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 1209e7c..93ea2e5 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.RestoreJob;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.util.Tool;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 11a1a3d..74bfb6c 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
index 2e311cf..ab56aaa 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 7fe9a61..8752ca2 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
index 769785f..f63bf29 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -36,8 +36,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.mapreduce.TestLoadIncrementalHFiles;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.TestLoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Assert;
@@ -46,8 +47,6 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
/**
* 1. Create table t1
* 2. Load data to t1
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
index d25f9d1..784a250 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
@@ -420,6 +420,10 @@ public class ColumnFamilyDescriptorBuilder {
return this;
}
+ public String getNameAsString() {
+ return desc.getNameAsString();
+ }
+
public ColumnFamilyDescriptorBuilder setBlockCacheEnabled(boolean value) {
desc.setBlockCacheEnabled(value);
return this;
@@ -470,6 +474,10 @@ public class ColumnFamilyDescriptorBuilder {
return this;
}
+ public Compression.Algorithm getCompressionType() {
+ return desc.getCompressionType();
+ }
+
public ColumnFamilyDescriptorBuilder setConfiguration(final String key, final String value) {
desc.setConfiguration(key, value);
return this;
@@ -610,7 +618,7 @@ public class ColumnFamilyDescriptorBuilder {
*/
@InterfaceAudience.Private
public ModifyableColumnFamilyDescriptor(final byte[] name) {
- this(isLegalColumnFamilyName(name), getDefaultValuesBytes(), Collections.EMPTY_MAP);
+ this(isLegalColumnFamilyName(name), getDefaultValuesBytes(), Collections.emptyMap());
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
index 66d99dd..e4cd54d 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
@@ -48,10 +48,10 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
import org.apache.hadoop.hbase.mapreduce.ExportUtils;
-import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.mapreduce.Import;
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessControlConstants;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index 75f8ee2..0b33d20 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
index 52f1223..cd84163 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
@@ -20,9 +20,16 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
@@ -53,6 +60,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RegionSplitter;
@@ -77,15 +85,8 @@ import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
/**
* Test Bulk Load and MR on a distributed cluster.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
index fb7acf4..246cb5b 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
@@ -32,21 +32,22 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
-import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
index 9cccf8c..513beb4 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
index 1c69e77..dc5214e 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.util.ProgramDriver;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
index 3c3060b..5a8ead2 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.Partitioner;
*
* <p>This class is not suitable as partitioner creating hfiles
* for incremental bulk loads as region spread will likely change between time of
- * hfile creation and load time. See {@link LoadIncrementalHFiles}
+ * hfile creation and load time. See {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles}
* and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.</p>
*
* @param <KEY> The type of the key.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java
deleted file mode 100644
index b342f64..0000000
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.security.UserProvider;
-
-/**
- * A {@link UserProvider} that always says hadoop security is enabled, regardless of the underlying
- * configuration. HBase security is <i>not enabled</i> as this is used to determine if SASL is used
- * to do the authentication, which requires a Kerberos ticket (which we currently don't have in
- * tests).
- * <p>
- * This should only be used for <b>TESTING</b>.
- */
-public class HadoopSecurityEnabledUserProviderForTesting extends UserProvider {
-
- @Override
- public boolean isHBaseSecurityEnabled() {
- return false;
- }
-
- @Override
- public boolean isHadoopSecurityEnabled() {
- return true;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index c6a8761..cbff2de 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -36,6 +36,8 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -88,6 +90,7 @@ import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -109,9 +112,6 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
/**
* Simple test for {@link HFileOutputFormat2}.
* Sets up and runs a mapreduce job that writes hfile output.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
deleted file mode 100644
index 529a448..0000000
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ /dev/null
@@ -1,669 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.mockito.Mockito;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
-
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
-
-/**
- * Test cases for the atomic load error handling of the bulk load functionality.
- */
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestLoadIncrementalHFilesSplitRecovery {
- private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
-
- static HBaseTestingUtility util;
- //used by secure subclass
- static boolean useSecure = false;
-
- final static int NUM_CFS = 10;
- final static byte[] QUAL = Bytes.toBytes("qual");
- final static int ROWCOUNT = 100;
-
- private final static byte[][] families = new byte[NUM_CFS][];
-
- @Rule
- public TestName name = new TestName();
-
- static {
- for (int i = 0; i < NUM_CFS; i++) {
- families[i] = Bytes.toBytes(family(i));
- }
- }
-
- static byte[] rowkey(int i) {
- return Bytes.toBytes(String.format("row_%08d", i));
- }
-
- static String family(int i) {
- return String.format("family_%04d", i);
- }
-
- static byte[] value(int i) {
- return Bytes.toBytes(String.format("%010d", i));
- }
-
- public static void buildHFiles(FileSystem fs, Path dir, int value)
- throws IOException {
- byte[] val = value(value);
- for (int i = 0; i < NUM_CFS; i++) {
- Path testIn = new Path(dir, family(i));
-
- TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
- Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
- }
- }
-
- /**
- * Creates a table with given table name and specified number of column
- * families if the table does not already exist.
- */
- private void setupTable(final Connection connection, TableName table, int cfs)
- throws IOException {
- try {
- LOG.info("Creating table " + table);
- HTableDescriptor htd = new HTableDescriptor(table);
- for (int i = 0; i < cfs; i++) {
- htd.addFamily(new HColumnDescriptor(family(i)));
- }
- try (Admin admin = connection.getAdmin()) {
- admin.createTable(htd);
- }
- } catch (TableExistsException tee) {
- LOG.info("Table " + table + " already exists");
- }
- }
-
- /**
- * Creates a table with given table name,specified number of column families<br>
- * and splitkeys if the table does not already exist.
- * @param table
- * @param cfs
- * @param SPLIT_KEYS
- */
- private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
- throws IOException {
- try {
- LOG.info("Creating table " + table);
- HTableDescriptor htd = new HTableDescriptor(table);
- for (int i = 0; i < cfs; i++) {
- htd.addFamily(new HColumnDescriptor(family(i)));
- }
-
- util.createTable(htd, SPLIT_KEYS);
- } catch (TableExistsException tee) {
- LOG.info("Table " + table + " already exists");
- }
- }
-
- private Path buildBulkFiles(TableName table, int value) throws Exception {
- Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
- Path bulk1 = new Path(dir, table.getNameAsString() + value);
- FileSystem fs = util.getTestFileSystem();
- buildHFiles(fs, bulk1, value);
- return bulk1;
- }
-
- /**
- * Populate table with known values.
- */
- private void populateTable(final Connection connection, TableName table, int value)
- throws Exception {
- // create HFiles for different column families
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
- Path bulk1 = buildBulkFiles(table, value);
- try (Table t = connection.getTable(table);
- RegionLocator locator = connection.getRegionLocator(table);
- Admin admin = connection.getAdmin()) {
- lih.doBulkLoad(bulk1, admin, t, locator);
- }
- }
-
- /**
- * Split the known table in half. (this is hard coded for this test suite)
- */
- private void forceSplit(TableName table) {
- try {
- // need to call regions server to by synchronous but isn't visible.
- HRegionServer hrs = util.getRSForFirstRegionInTable(table);
-
- for (HRegionInfo hri :
- ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
- if (hri.getTable().equals(table)) {
- util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
- //ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
- }
- }
-
- // verify that split completed.
- int regions;
- do {
- regions = 0;
- for (HRegionInfo hri :
- ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
- if (hri.getTable().equals(table)) {
- regions++;
- }
- }
- if (regions != 2) {
- LOG.info("Taking some time to complete split...");
- Thread.sleep(250);
- }
- } while (regions != 2);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- util = new HBaseTestingUtility();
- util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
- util.startMiniCluster(1);
- }
-
- @AfterClass
- public static void teardownCluster() throws Exception {
- util.shutdownMiniCluster();
- }
-
- /**
- * Checks that all columns have the expected value and that there is the
- * expected number of rows.
- * @throws IOException
- */
- void assertExpectedTable(TableName table, int count, int value) throws IOException {
- HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString());
- assertEquals(htds.length, 1);
- Table t = null;
- try {
- t = util.getConnection().getTable(table);
- Scan s = new Scan();
- ResultScanner sr = t.getScanner(s);
- int i = 0;
- for (Result r : sr) {
- i++;
- for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
- for (byte[] val : nm.values()) {
- assertTrue(Bytes.equals(val, value(value)));
- }
- }
- }
- assertEquals(count, i);
- } catch (IOException e) {
- fail("Failed due to exception");
- } finally {
- if (t != null) t.close();
- }
- }
-
- /**
- * Test that shows that exception thrown from the RS side will result in an
- * exception on the LIHFile client.
- */
- @Test(expected=IOException.class, timeout=120000)
- public void testBulkLoadPhaseFailure() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName());
- final AtomicInteger attmptedCalls = new AtomicInteger();
- final AtomicInteger failedCalls = new AtomicInteger();
- util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
- try (Connection connection = ConnectionFactory.createConnection(util
- .getConfiguration())) {
- setupTable(connection, table, 10);
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
- util.getConfiguration()) {
- @Override
- protected List<LoadQueueItem> tryAtomicRegionLoad(
- ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
- Collection<LoadQueueItem> lqis) throws IOException {
- int i = attmptedCalls.incrementAndGet();
- if (i == 1) {
- Connection errConn;
- try {
- errConn = getMockedConnection(util.getConfiguration());
- serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true);
- } catch (Exception e) {
- LOG.fatal("mocking cruft, should never happen", e);
- throw new RuntimeException("mocking cruft, should never happen");
- }
- failedCalls.incrementAndGet();
- return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
- }
-
- return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
- }
- };
- try {
- // create HFiles for different column families
- Path dir = buildBulkFiles(table, 1);
- try (Table t = connection.getTable(table);
- RegionLocator locator = connection.getRegionLocator(table);
- Admin admin = connection.getAdmin()) {
- lih.doBulkLoad(dir, admin, t, locator);
- }
- } finally {
- util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- }
- fail("doBulkLoad should have thrown an exception");
- }
- }
-
- /**
- * Test that shows that exception thrown from the RS side will result in the
- * expected number of retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER}
- * when ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
- */
- @Test
- public void testRetryOnIOException() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName());
- final AtomicInteger calls = new AtomicInteger(1);
- final Connection conn = ConnectionFactory.createConnection(util
- .getConfiguration());
- util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
- util.getConfiguration().setBoolean(
- LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
- final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
- util.getConfiguration()) {
- @Override
- protected List<LoadQueueItem> tryAtomicRegionLoad(
- ClientServiceCallable<byte[]> serverCallable, TableName tableName,
- final byte[] first, Collection<LoadQueueItem> lqis)
- throws IOException {
- if (calls.getAndIncrement() < util.getConfiguration().getInt(
- HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) {
- ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(
- conn, tableName, first, new RpcControllerFactory(
- util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
- @Override
- public byte[] rpcCall() throws Exception {
- throw new IOException("Error calling something on RegionServer");
- }
- };
- return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
- } else {
- return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
- }
- }
- };
- setupTable(conn, table, 10);
- Path dir = buildBulkFiles(table, 1);
- lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table),
- conn.getRegionLocator(table));
- util.getConfiguration().setBoolean(
- LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
-
- }
-
- @SuppressWarnings("deprecation")
- private ClusterConnection getMockedConnection(final Configuration conf)
- throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
- ClusterConnection c = Mockito.mock(ClusterConnection.class);
- Mockito.when(c.getConfiguration()).thenReturn(conf);
- Mockito.doNothing().when(c).close();
- // Make it so we return a particular location when asked.
- final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
- ServerName.valueOf("example.org", 1234, 0));
- Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
- (byte[]) Mockito.any(), Mockito.anyBoolean())).
- thenReturn(loc);
- Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
- thenReturn(loc);
- ClientProtos.ClientService.BlockingInterface hri =
- Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
- Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
- thenThrow(new ServiceException(new IOException("injecting bulk load error")));
- Mockito.when(c.getClient(Mockito.any(ServerName.class))).
- thenReturn(hri);
- return c;
- }
-
- /**
- * This test exercises the path where there is a split after initial
- * validation but before the atomic bulk load call. We cannot use presplitting
- * to test this path, so we actually inject a split just before the atomic
- * region load.
- */
- @Test (timeout=120000)
- public void testSplitWhileBulkLoadPhase() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName());
- try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
- setupTable(connection, table, 10);
- populateTable(connection, table,1);
- assertExpectedTable(table, ROWCOUNT, 1);
-
- // Now let's cause trouble. This will occur after checks and cause bulk
- // files to fail when attempt to atomically import. This is recoverable.
- final AtomicInteger attemptedCalls = new AtomicInteger();
- LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
- @Override
- protected void bulkLoadPhase(final Table htable, final Connection conn,
- ExecutorService pool, Deque<LoadQueueItem> queue,
- final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
- Map<LoadQueueItem, ByteBuffer> item2RegionMap)
- throws IOException {
- int i = attemptedCalls.incrementAndGet();
- if (i == 1) {
- // On first attempt force a split.
- forceSplit(table);
- }
- super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
- }
- };
-
- // create HFiles for different column families
- try (Table t = connection.getTable(table);
- RegionLocator locator = connection.getRegionLocator(table);
- Admin admin = connection.getAdmin()) {
- Path bulk = buildBulkFiles(table, 2);
- lih2.doBulkLoad(bulk, admin, t, locator);
- }
-
- // check that data was loaded
- // The three expected attempts are 1) failure because need to split, 2)
- // load of split top 3) load of split bottom
- assertEquals(attemptedCalls.get(), 3);
- assertExpectedTable(table, ROWCOUNT, 2);
- }
- }
-
- /**
- * This test splits a table and attempts to bulk load. The bulk import files
- * should be split before atomically importing.
- */
- @Test (timeout=120000)
- public void testGroupOrSplitPresplit() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName());
- try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
- setupTable(connection, table, 10);
- populateTable(connection, table, 1);
- assertExpectedTable(connection, table, ROWCOUNT, 1);
- forceSplit(table);
-
- final AtomicInteger countedLqis= new AtomicInteger();
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
- util.getConfiguration()) {
- @Override
- protected Pair<List<LoadQueueItem>, String> groupOrSplit(
- Multimap<ByteBuffer, LoadQueueItem> regionGroups,
- final LoadQueueItem item, final Table htable,
- final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
- startEndKeys);
- if (lqis != null && lqis.getFirst() != null) {
- countedLqis.addAndGet(lqis.getFirst().size());
- }
- return lqis;
- }
- };
-
- // create HFiles for different column families
- Path bulk = buildBulkFiles(table, 2);
- try (Table t = connection.getTable(table);
- RegionLocator locator = connection.getRegionLocator(table);
- Admin admin = connection.getAdmin()) {
- lih.doBulkLoad(bulk, admin, t, locator);
- }
- assertExpectedTable(connection, table, ROWCOUNT, 2);
- assertEquals(20, countedLqis.get());
- }
- }
-
- /**
- * This test creates a table with many small regions. The bulk load files
- * would be splitted multiple times before all of them can be loaded successfully.
- */
- @Test (timeout=120000)
- public void testSplitTmpFileCleanUp() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName());
- byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
- Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"),
- Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")};
- try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
- setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
-
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
-
- // create HFiles
- Path bulk = buildBulkFiles(table, 2);
- try (Table t = connection.getTable(table);
- RegionLocator locator = connection.getRegionLocator(table);
- Admin admin = connection.getAdmin()) {
- lih.doBulkLoad(bulk, admin, t, locator);
- }
- // family path
- Path tmpPath = new Path(bulk, family(0));
- // TMP_DIR under family path
- tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
- FileSystem fs = bulk.getFileSystem(util.getConfiguration());
- // HFiles have been splitted, there is TMP_DIR
- assertTrue(fs.exists(tmpPath));
- // TMP_DIR should have been cleaned-up
- assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
- FSUtils.listStatus(fs, tmpPath));
- assertExpectedTable(connection, table, ROWCOUNT, 2);
- }
- }
-
- /**
- * This simulates an remote exception which should cause LIHF to exit with an
- * exception.
- */
- @Test(expected = IOException.class, timeout=120000)
- public void testGroupOrSplitFailure() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
- try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
- setupTable(connection, tableName, 10);
-
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
- util.getConfiguration()) {
- int i = 0;
-
- @Override
- protected Pair<List<LoadQueueItem>, String> groupOrSplit(
- Multimap<ByteBuffer, LoadQueueItem> regionGroups,
- final LoadQueueItem item, final Table table,
- final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- i++;
-
- if (i == 5) {
- throw new IOException("failure");
- }
- return super.groupOrSplit(regionGroups, item, table, startEndKeys);
- }
- };
-
- // create HFiles for different column families
- Path dir = buildBulkFiles(tableName,1);
- try (Table t = connection.getTable(tableName);
- RegionLocator locator = connection.getRegionLocator(tableName);
- Admin admin = connection.getAdmin()) {
- lih.doBulkLoad(dir, admin, t, locator);
- }
- }
-
- fail("doBulkLoad should have thrown an exception");
- }
-
- @Test (timeout=120000)
- public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
- byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
- // Share connection. We were failing to find the table with our new reverse scan because it
- // looks for first region, not any region -- that is how it works now. The below removes first
- // region in test. Was reliant on the Connection caching having first region.
- Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
- Table table = connection.getTable(tableName);
-
- setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
- Path dir = buildBulkFiles(tableName, 2);
-
- final AtomicInteger countedLqis = new AtomicInteger();
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
-
- @Override
- protected Pair<List<LoadQueueItem>, String> groupOrSplit(
- Multimap<ByteBuffer, LoadQueueItem> regionGroups,
- final LoadQueueItem item, final Table htable,
- final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
- startEndKeys);
- if (lqis != null && lqis.getFirst() != null) {
- countedLqis.addAndGet(lqis.getFirst().size());
- }
- return lqis;
- }
- };
-
- // do bulkload when there is no region hole in hbase:meta.
- try (Table t = connection.getTable(tableName);
- RegionLocator locator = connection.getRegionLocator(tableName);
- Admin admin = connection.getAdmin()) {
- loader.doBulkLoad(dir, admin, t, locator);
- } catch (Exception e) {
- LOG.error("exeception=", e);
- }
- // check if all the data are loaded into the table.
- this.assertExpectedTable(tableName, ROWCOUNT, 2);
-
- dir = buildBulkFiles(tableName, 3);
-
- // Mess it up by leaving a hole in the hbase:meta
- List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
- for (HRegionInfo regionInfo : regionInfos) {
- if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
- MetaTableAccessor.deleteRegion(connection, regionInfo);
- break;
- }
- }
-
- try (Table t = connection.getTable(tableName);
- RegionLocator locator = connection.getRegionLocator(tableName);
- Admin admin = connection.getAdmin()) {
- loader.doBulkLoad(dir, admin, t, locator);
- } catch (Exception e) {
- LOG.error("exception=", e);
- assertTrue("IOException expected", e instanceof IOException);
- }
-
- table.close();
-
- // Make sure at least the one region that still exists can be found.
- regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
- assertTrue(regionInfos.size() >= 1);
-
- this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
- connection.close();
- }
-
- /**
- * Checks that all columns have the expected value and that there is the
- * expected number of rows.
- * @throws IOException
- */
- void assertExpectedTable(final Connection connection, TableName table, int count, int value)
- throws IOException {
- HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString());
- assertEquals(htds.length, 1);
- Table t = null;
- try {
- t = connection.getTable(table);
- Scan s = new Scan();
- ResultScanner sr = t.getScanner(s);
- int i = 0;
- for (Result r : sr) {
- i++;
- for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
- for (byte[] val : nm.values()) {
- assertTrue(Bytes.equals(val, value(value)));
- }
- }
- }
- assertEquals(count, i);
- } catch (IOException e) {
- fail("Failed due to exception");
- } finally {
- if (t != null) t.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java
deleted file mode 100644
index 78fddbc..0000000
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.access.AccessControlLists;
-import org.apache.hadoop.hbase.security.access.SecureTestUtil;
-
-import org.junit.BeforeClass;
-import org.junit.experimental.categories.Category;
-
-/**
- * Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode.
- * This suite is unable to verify the security handoff/turnover
- * as miniCluster is running as system user thus has root privileges
- * and delegation tokens don't seem to work on miniDFS.
- *
- * Thus SecureBulkload can only be completely verified by running
- * integration tests against a secure cluster. This suite is still
- * invaluable as it verifies the other mechanisms that need to be
- * supported as part of a LoadIncrementalFiles call.
- */
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- // set the always on security provider
- UserProvider.setUserProviderForTesting(util.getConfiguration(),
- HadoopSecurityEnabledUserProviderForTesting.class);
- // setup configuration
- SecureTestUtil.enableSecurity(util.getConfiguration());
- util.getConfiguration().setInt(
- LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
- MAX_FILES_PER_REGION_PER_FAMILY);
- // change default behavior so that tag values are returned with normal rpcs
- util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
- KeyValueCodecWithTags.class.getCanonicalName());
-
- util.startMiniCluster();
-
- // Wait for the ACL table to become available
- util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
-
- setupNamespace();
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java
deleted file mode 100644
index 0e877ad..0000000
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.access.AccessControlLists;
-import org.apache.hadoop.hbase.security.access.SecureTestUtil;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-
-/**
- * Reruns TestSecureLoadIncrementalHFilesSplitRecovery
- * using LoadIncrementalHFiles in secure mode.
- * This suite is unable to verify the security handoff/turnove
- * as miniCluster is running as system user thus has root privileges
- * and delegation tokens don't seem to work on miniDFS.
- *
- * Thus SecureBulkload can only be completely verified by running
- * integration tests against a secure cluster. This suite is still
- * invaluable as it verifies the other mechanisms that need to be
- * supported as part of a LoadIncrementalFiles call.
- */
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestSecureLoadIncrementalHFilesSplitRecovery extends TestLoadIncrementalHFilesSplitRecovery {
-
- //This "overrides" the parent static method
- //make sure they are in sync
- @BeforeClass
- public static void setupCluster() throws Exception {
- util = new HBaseTestingUtility();
- // set the always on security provider
- UserProvider.setUserProviderForTesting(util.getConfiguration(),
- HadoopSecurityEnabledUserProviderForTesting.class);
- // setup configuration
- SecureTestUtil.enableSecurity(util.getConfiguration());
-
- util.startMiniCluster();
-
- // Wait for the ACL table to become available
- util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
- }
-
- //Disabling this test as it does not work in secure mode
- @Test (timeout=180000)
- @Override
- public void testBulkLoadPhaseFailure() {
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
index 98d03c0..df9f4ff 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.snapshot;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
-import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
index 7d4832c..def0838 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
@@ -20,7 +20,7 @@
package org.apache.hadoop.hbase.snapshot;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
-import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;