You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2016/01/19 22:33:48 UTC

tajo git commit: TAJO-1921: Hbase Storage can cause NPE when the hbase cluster is restarted

Repository: tajo
Updated Branches:
  refs/heads/master 6c87e8e9d -> 72ebc43d6


TAJO-1921: Hbase Storage can cause NPE when the hbase cluster is restarted

Closes #918


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/72ebc43d
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/72ebc43d
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/72ebc43d

Branch: refs/heads/master
Commit: 72ebc43d6e685dad58b2bf238fae12e51c8dec96
Parents: 6c87e8e
Author: combineads <co...@combineads.co.kr>
Authored: Tue Jan 19 13:31:23 2016 -0800
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Jan 19 13:33:19 2016 -0800

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../tajo/engine/query/TestHBaseTable.java       |  56 ++++++++++
 .../testGetSplitsWhenRestartHBase.result        | 102 +++++++++++++++++++
 .../tajo/storage/hbase/HBaseTablespace.java     |  49 ++++-----
 4 files changed, 180 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/72ebc43d/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 3d49873..f680eb0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -90,6 +90,9 @@ Release 0.12.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1921: Hbase Storage can cause NPE when the hbase cluster is restarted.
+    (Byunghwa Yun via hyunsik)
+
     TAJO-2038: NPE in FileScanner#getProgress. (jinho)
 
     TAJO-2034: Files required for executing python functions are not copied in 

http://git-wip-us.apache.org/repos/asf/tajo/blob/72ebc43d/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 13b7711..97feb65 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -26,10 +26,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoTestingCluster;
@@ -1376,6 +1379,59 @@ public class TestHBaseTable extends QueryTestCaseBase {
     }
   }
 
+  @Test
+  public void testGetSplitsWhenRestartHBase() throws Exception {
+    executeString("CREATE TABLE hbase_mapped_table1 (rk text, col1 text, col2 text, col3 int) "
+      + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table1', 'columns'=':key,col1:a,col2:,col3:#b', "
+      + "'hbase.split.rowkeys'='010,020,030,040,050,060,070,080,090')").close();
+
+    assertTableExists("hbase_mapped_table1");
+    HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    HTable htable = null;
+    try {
+      hAdmin.tableExists("hbase_table1");
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table1");
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+      assertEquals(10, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "".getBytes(), Bytes.toBytes(i));
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from hbase_mapped_table1");
+      assertResultSet(res);
+      res.close();
+
+      MiniHBaseCluster cluster = testingCluster.getHBaseUtil().getMiniHBaseCluster();
+      HMaster master = cluster.getMaster();
+      master.balanceSwitch(true);
+      assertEquals(1, cluster.getLiveRegionServerThreads().size());
+      HRegionServer orgRegionServer = cluster.getLiveRegionServerThreads().get(0).getRegionServer();
+      cluster.startRegionServer().waitForServerOnline();
+      cluster.startRegionServer().waitForServerOnline();
+      cluster.startRegionServer().waitForServerOnline();
+      cluster.stopRegionServer(orgRegionServer.getServerName());
+      cluster.waitForRegionServerToStop(orgRegionServer.getServerName(), 1000);
+
+      res = executeString("select * from hbase_mapped_table1");
+      assertResultSet(res);
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table1 PURGE").close();
+      hAdmin.close();
+      if (htable == null) {
+        htable.close();
+      }
+    }
+  }
+
   private String resultSetToString(ResultScanner scanner, byte[][] cfNames, byte[][] qualifiers,
                                    boolean [] binaries, Schema schema) throws Exception {
     StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/tajo/blob/72ebc43d/tajo-core-tests/src/test/resources/results/TestHBaseTable/testGetSplitsWhenRestartHBase.result
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestHBaseTable/testGetSplitsWhenRestartHBase.result b/tajo-core-tests/src/test/resources/results/TestHBaseTable/testGetSplitsWhenRestartHBase.result
new file mode 100644
index 0000000..4f5fd8f
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestHBaseTable/testGetSplitsWhenRestartHBase.result
@@ -0,0 +1,102 @@
+rk,col1,col2,col3
+-------------------------------
+000,a-0,{"k1":"k1-0", "k2":"k2-0"},0
+001,a-1,{"k1":"k1-1", "k2":"k2-1"},1
+002,a-2,{"k1":"k1-2", "k2":"k2-2"},2
+003,a-3,{"k1":"k1-3", "k2":"k2-3"},3
+004,a-4,{"k1":"k1-4", "k2":"k2-4"},4
+005,a-5,{"k1":"k1-5", "k2":"k2-5"},5
+006,a-6,{"k1":"k1-6", "k2":"k2-6"},6
+007,a-7,{"k1":"k1-7", "k2":"k2-7"},7
+008,a-8,{"k1":"k1-8", "k2":"k2-8"},8
+009,a-9,{"k1":"k1-9", "k2":"k2-9"},9
+010,a-10,{"k1":"k1-10", "k2":"k2-10"},10
+011,a-11,{"k1":"k1-11", "k2":"k2-11"},11
+012,a-12,{"k1":"k1-12", "k2":"k2-12"},12
+013,a-13,{"k1":"k1-13", "k2":"k2-13"},13
+014,a-14,{"k1":"k1-14", "k2":"k2-14"},14
+015,a-15,{"k1":"k1-15", "k2":"k2-15"},15
+016,a-16,{"k1":"k1-16", "k2":"k2-16"},16
+017,a-17,{"k1":"k1-17", "k2":"k2-17"},17
+018,a-18,{"k1":"k1-18", "k2":"k2-18"},18
+019,a-19,{"k1":"k1-19", "k2":"k2-19"},19
+020,a-20,{"k1":"k1-20", "k2":"k2-20"},20
+021,a-21,{"k1":"k1-21", "k2":"k2-21"},21
+022,a-22,{"k1":"k1-22", "k2":"k2-22"},22
+023,a-23,{"k1":"k1-23", "k2":"k2-23"},23
+024,a-24,{"k1":"k1-24", "k2":"k2-24"},24
+025,a-25,{"k1":"k1-25", "k2":"k2-25"},25
+026,a-26,{"k1":"k1-26", "k2":"k2-26"},26
+027,a-27,{"k1":"k1-27", "k2":"k2-27"},27
+028,a-28,{"k1":"k1-28", "k2":"k2-28"},28
+029,a-29,{"k1":"k1-29", "k2":"k2-29"},29
+030,a-30,{"k1":"k1-30", "k2":"k2-30"},30
+031,a-31,{"k1":"k1-31", "k2":"k2-31"},31
+032,a-32,{"k1":"k1-32", "k2":"k2-32"},32
+033,a-33,{"k1":"k1-33", "k2":"k2-33"},33
+034,a-34,{"k1":"k1-34", "k2":"k2-34"},34
+035,a-35,{"k1":"k1-35", "k2":"k2-35"},35
+036,a-36,{"k1":"k1-36", "k2":"k2-36"},36
+037,a-37,{"k1":"k1-37", "k2":"k2-37"},37
+038,a-38,{"k1":"k1-38", "k2":"k2-38"},38
+039,a-39,{"k1":"k1-39", "k2":"k2-39"},39
+040,a-40,{"k1":"k1-40", "k2":"k2-40"},40
+041,a-41,{"k1":"k1-41", "k2":"k2-41"},41
+042,a-42,{"k1":"k1-42", "k2":"k2-42"},42
+043,a-43,{"k1":"k1-43", "k2":"k2-43"},43
+044,a-44,{"k1":"k1-44", "k2":"k2-44"},44
+045,a-45,{"k1":"k1-45", "k2":"k2-45"},45
+046,a-46,{"k1":"k1-46", "k2":"k2-46"},46
+047,a-47,{"k1":"k1-47", "k2":"k2-47"},47
+048,a-48,{"k1":"k1-48", "k2":"k2-48"},48
+049,a-49,{"k1":"k1-49", "k2":"k2-49"},49
+050,a-50,{"k1":"k1-50", "k2":"k2-50"},50
+051,a-51,{"k1":"k1-51", "k2":"k2-51"},51
+052,a-52,{"k1":"k1-52", "k2":"k2-52"},52
+053,a-53,{"k1":"k1-53", "k2":"k2-53"},53
+054,a-54,{"k1":"k1-54", "k2":"k2-54"},54
+055,a-55,{"k1":"k1-55", "k2":"k2-55"},55
+056,a-56,{"k1":"k1-56", "k2":"k2-56"},56
+057,a-57,{"k1":"k1-57", "k2":"k2-57"},57
+058,a-58,{"k1":"k1-58", "k2":"k2-58"},58
+059,a-59,{"k1":"k1-59", "k2":"k2-59"},59
+060,a-60,{"k1":"k1-60", "k2":"k2-60"},60
+061,a-61,{"k1":"k1-61", "k2":"k2-61"},61
+062,a-62,{"k1":"k1-62", "k2":"k2-62"},62
+063,a-63,{"k1":"k1-63", "k2":"k2-63"},63
+064,a-64,{"k1":"k1-64", "k2":"k2-64"},64
+065,a-65,{"k1":"k1-65", "k2":"k2-65"},65
+066,a-66,{"k1":"k1-66", "k2":"k2-66"},66
+067,a-67,{"k1":"k1-67", "k2":"k2-67"},67
+068,a-68,{"k1":"k1-68", "k2":"k2-68"},68
+069,a-69,{"k1":"k1-69", "k2":"k2-69"},69
+070,a-70,{"k1":"k1-70", "k2":"k2-70"},70
+071,a-71,{"k1":"k1-71", "k2":"k2-71"},71
+072,a-72,{"k1":"k1-72", "k2":"k2-72"},72
+073,a-73,{"k1":"k1-73", "k2":"k2-73"},73
+074,a-74,{"k1":"k1-74", "k2":"k2-74"},74
+075,a-75,{"k1":"k1-75", "k2":"k2-75"},75
+076,a-76,{"k1":"k1-76", "k2":"k2-76"},76
+077,a-77,{"k1":"k1-77", "k2":"k2-77"},77
+078,a-78,{"k1":"k1-78", "k2":"k2-78"},78
+079,a-79,{"k1":"k1-79", "k2":"k2-79"},79
+080,a-80,{"k1":"k1-80", "k2":"k2-80"},80
+081,a-81,{"k1":"k1-81", "k2":"k2-81"},81
+082,a-82,{"k1":"k1-82", "k2":"k2-82"},82
+083,a-83,{"k1":"k1-83", "k2":"k2-83"},83
+084,a-84,{"k1":"k1-84", "k2":"k2-84"},84
+085,a-85,{"k1":"k1-85", "k2":"k2-85"},85
+086,a-86,{"k1":"k1-86", "k2":"k2-86"},86
+087,a-87,{"k1":"k1-87", "k2":"k2-87"},87
+088,a-88,{"k1":"k1-88", "k2":"k2-88"},88
+089,a-89,{"k1":"k1-89", "k2":"k2-89"},89
+090,a-90,{"k1":"k1-90", "k2":"k2-90"},90
+091,a-91,{"k1":"k1-91", "k2":"k2-91"},91
+092,a-92,{"k1":"k1-92", "k2":"k2-92"},92
+093,a-93,{"k1":"k1-93", "k2":"k2-93"},93
+094,a-94,{"k1":"k1-94", "k2":"k2-94"},94
+095,a-95,{"k1":"k1-95", "k2":"k2-95"},95
+096,a-96,{"k1":"k1-96", "k2":"k2-96"},96
+097,a-97,{"k1":"k1-97", "k2":"k2-97"},97
+098,a-98,{"k1":"k1-98", "k2":"k2-98"},98
+099,a-99,{"k1":"k1-99", "k2":"k2-99"},99
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/72ebc43d/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index a7aa11b..1b81531 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.RegionSizeCalculator;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
@@ -421,10 +422,10 @@ public class HBaseTablespace extends Tablespace {
 
     List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, filterCondition);
     HTable htable = null;
-    HBaseAdmin hAdmin = null;
 
     try {
       htable = new HTable(hbaseConf, tableDesc.getMeta().getProperty(HBaseStorageConstants.META_TABLE_KEY));
+      RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable);
 
       org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
       if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
@@ -433,12 +434,18 @@ public class HBaseTablespace extends Tablespace {
           throw new IOException("Expecting at least one region.");
         }
         List<Fragment> fragments = new ArrayList<>(1);
-        Fragment fragment = new HBaseFragment(
+        HBaseFragment fragment = new HBaseFragment(
             tableDesc.getUri(),
             inputSourceId, htable.getName().getNameAsString(),
             HConstants.EMPTY_BYTE_ARRAY,
             HConstants.EMPTY_BYTE_ARRAY,
             regLoc.getHostname());
+        long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
+        if (regionSize == 0) {
+          fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
+        } else {
+          fragment.setLength(regionSize);
+        }
         fragments.add(fragment);
         return fragments;
       }
@@ -473,13 +480,14 @@ public class HBaseTablespace extends Tablespace {
         stopRows  = EMPTY_END_ROW_KEY;
       }
 
-      hAdmin =  new HBaseAdmin(hbaseConf);
-      Map<ServerName, ServerLoad> serverLoadMap = new HashMap<>();
-
+      // reference: org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(JobContext)
       // region startkey -> HBaseFragment
       Map<byte[], HBaseFragment> fragmentMap = new HashMap<>();
       for (int i = 0; i < keys.getFirst().length; i++) {
         HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false);
+        if (null == location) {
+          throw new IOException("Can't find the region of the key: " +  Bytes.toStringBinary(keys.getFirst()[i]));
+        }
 
         byte[] regionStartKey = keys.getFirst()[i];
         byte[] regionStopKey = keys.getSecond()[i];
@@ -497,14 +505,6 @@ public class HBaseTablespace extends Tablespace {
             byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
                 regionStopKey.length > 0 ? regionStopKey : stopRow;
 
-            String regionName = location.getRegionInfo().getRegionNameAsString();
-
-            ServerLoad serverLoad = serverLoadMap.get(location.getServerName());
-            if (serverLoad == null) {
-              serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName());
-              serverLoadMap.put(location.getServerName(), serverLoad);
-            }
-
             if (fragmentMap.containsKey(regionStartKey)) {
               HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
               if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
@@ -514,27 +514,19 @@ public class HBaseTablespace extends Tablespace {
                 prevFragment.setStopRow(fragmentStop);
               }
             } else {
+              byte[] regionName = location.getRegionInfo().getRegionName();
+              long regionSize = sizeCalculator.getRegionSize(regionName);
+
               HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(),
                   inputSourceId,
                   htable.getName().getNameAsString(),
                   fragmentStart,
                   fragmentStop,
                   location.getHostname());
-
-              // get region size
-              boolean foundLength = false;
-              for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) {
-                if (regionName.equals(Bytes.toString(entry.getKey()))) {
-                  RegionLoad regionLoad = entry.getValue();
-                  long storeFileSize = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L;
-                  fragment.setLength(storeFileSize);
-                  foundLength = true;
-                  break;
-                }
-              }
-
-              if (!foundLength) {
+              if (regionSize == 0) {
                 fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
+              } else {
+                fragment.setLength(regionSize);
               }
 
               fragmentMap.put(regionStartKey, fragment);
@@ -557,9 +549,6 @@ public class HBaseTablespace extends Tablespace {
       if (htable != null) {
         htable.close();
       }
-      if (hAdmin != null) {
-        hAdmin.close();
-      }
     }
   }