You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/08/13 02:39:50 UTC

[1/6] hbase git commit: HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Repository: hbase
Updated Branches:
  refs/heads/0.98 87729ccfa -> a705a7484
  refs/heads/branch-1 ba4cc6321 -> e8b5e922c
  refs/heads/branch-1.0 b99b56715 -> a81d0a4a7
  refs/heads/branch-1.1 aa6460539 -> 5627e0020
  refs/heads/branch-1.2 2a5b5c791 -> 93d6fbe92
  refs/heads/master 5e5bcceb5 -> 643ba9018


HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/643ba901
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/643ba901
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/643ba901

Branch: refs/heads/master
Commit: 643ba90185f20419016080d6d32adba9fe7019dd
Parents: 5e5bcce
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Aug 12 16:32:37 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 12 16:32:37 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/thrift/IncrementCoalescer.java |   8 +-
 .../hadoop/hbase/thrift/ThriftServerRunner.java | 137 ++++++++++++++-----
 2 files changed, 109 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/643ba901/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index 13a2e50..e937f2d 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -264,8 +264,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
           if (counter == null) {
             continue;
           }
+          Table table = null;
           try {
-            Table table = handler.getTable(row.getTable());
+            table = handler.getTable(row.getTable());
             if (failures > 2) {
               throw new IOException("Auto-Fail rest of ICVs");
             }
@@ -278,8 +279,11 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
                 + Bytes.toStringBinary(row.getRowKey()) + ", "
                 + Bytes.toStringBinary(row.getFamily()) + ", "
                 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
+          } finally{
+            if(table != null){
+              table.close();
+            }
           }
-
         }
         return failures;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/643ba901/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index a5239ed..668aeb6 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -638,15 +638,6 @@ public class ThriftServerRunner implements Runnable {
     private ThriftMetrics metrics = null;
 
     private final ConnectionCache connectionCache;
-
-    private static ThreadLocal<Map<String, Table>> threadLocalTables =
-        new ThreadLocal<Map<String, Table>>() {
-      @Override
-      protected Map<String, Table> initialValue() {
-        return new TreeMap<String, Table>();
-      }
-    };
-
     IncrementCoalescer coalescer = null;
 
     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
@@ -679,11 +670,7 @@ public class ThriftServerRunner implements Runnable {
     public Table getTable(final byte[] tableName) throws
         IOException {
       String table = Bytes.toString(tableName);
-      Map<String, Table> tables = threadLocalTables.get();
-      if (!tables.containsKey(table)) {
-        tables.put(table, (Table)connectionCache.getTable(table));
-      }
-      return tables.get(table);
+      return connectionCache.getTable(table);
     }
 
     public Table getTable(final ByteBuffer tableName) throws IOException {
@@ -879,8 +866,9 @@ public class ThriftServerRunner implements Runnable {
                               byte[] family,
                               byte[] qualifier,
                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (qualifier == null) {
@@ -893,6 +881,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -920,8 +910,10 @@ public class ThriftServerRunner implements Runnable {
      */
     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -935,6 +927,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -962,8 +956,10 @@ public class ThriftServerRunner implements Runnable {
     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -978,6 +974,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1010,8 +1008,10 @@ public class ThriftServerRunner implements Runnable {
     public List<TRowResult> getRowWithColumnsTs(
         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         if (columns == null) {
           Get get = new Get(getBytes(row));
           addAttributes(get, attributes);
@@ -1035,6 +1035,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1072,9 +1074,11 @@ public class ThriftServerRunner implements Runnable {
                                                  List<ByteBuffer> rows,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table= null;
       try {
         List<Get> gets = new ArrayList<Get>(rows.size());
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         if (metrics != null) {
           metrics.incNumRowKeysInBatchGet(rows.size());
         }
@@ -1100,6 +1104,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1117,8 +1123,9 @@ public class ThriftServerRunner implements Runnable {
                             ByteBuffer row,
                             ByteBuffer column,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row));
         addAttributes(delete, attributes);
         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
@@ -1132,6 +1139,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1146,14 +1155,17 @@ public class ThriftServerRunner implements Runnable {
     public void deleteAllRowTs(
         ByteBuffer tableName, ByteBuffer row, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row), timestamp);
         addAttributes(delete, attributes);
         table.delete(delete);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1260,6 +1272,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1331,6 +1345,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1348,7 +1364,7 @@ public class ThriftServerRunner implements Runnable {
     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
         byte [] family, byte [] qualifier, long amount)
         throws IOError, IllegalArgument, TException {
-      Table table;
+      Table table = null;
       try {
         table = getTable(tableName);
         return table.incrementColumnValue(
@@ -1356,6 +1372,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1405,8 +1423,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan();
         addAttributes(scan, attributes);
         if (tScan.isSetStartRow()) {
@@ -1446,6 +1466,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1453,8 +1475,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1471,6 +1495,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1479,8 +1505,10 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1497,6 +1525,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1506,8 +1536,10 @@ public class ThriftServerRunner implements Runnable {
                                      List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startAndPrefix));
         addAttributes(scan, attributes);
         Filter f = new WhileMatchFilter(
@@ -1527,6 +1559,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1534,8 +1568,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1553,6 +1589,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1561,8 +1599,10 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1581,17 +1621,21 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
     @Override
     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
         ByteBuffer tableName) throws IOError, TException {
+      
+      Table table = null;
       try {
         TreeMap<ByteBuffer, ColumnDescriptor> columns =
           new TreeMap<ByteBuffer, ColumnDescriptor>();
 
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         HTableDescriptor desc = table.getTableDescriptor();
 
         for (HColumnDescriptor e : desc.getFamilies()) {
@@ -1602,9 +1646,23 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
-
+    
+    private void closeTable(Table table) throws IOError
+    {
+      try{
+        if(table != null){
+          table.close();
+        }
+      } catch (IOException e){
+        LOG.error(e.getMessage(), e);
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      }
+    }
+    
     @Override
     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
       try {
@@ -1650,10 +1708,13 @@ public class ThriftServerRunner implements Runnable {
       scan.setReversed(true);
       scan.addFamily(family);
       scan.setStartRow(row);
-
-      Table table = getTable(tableName);
+      Table table = getTable(tableName);      
       try (ResultScanner scanner = table.getScanner(scan)) {
         return scanner.next();
+      } finally{
+        if(table != null){
+          table.close();
+        }
       }
     }
 
@@ -1673,13 +1734,16 @@ public class ThriftServerRunner implements Runnable {
         return;
       }
 
+      Table table = null;
       try {
-        Table table = getTable(tincrement.getTable());
+        table = getTable(tincrement.getTable());
         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
         table.increment(inc);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1700,14 +1764,17 @@ public class ThriftServerRunner implements Runnable {
         throw new TException("Must supply a table and a row key; can't append");
       }
 
+      Table table = null;
       try {
-        Table table = getTable(tappend.getTable());
+        table = getTable(tappend.getTable());
         Append append = ThriftUtilities.appendFromThrift(tappend);
         Result result = table.append(append);
         return ThriftUtilities.cellFromHBase(result.rawCells());
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+          closeTable(table);
       }
     }
 
@@ -1743,6 +1810,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally {
+          closeTable(table);
       }
     }
   }


[2/6] hbase git commit: HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Posted by ap...@apache.org.
HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Conflicts:
	hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e8b5e922
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e8b5e922
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e8b5e922

Branch: refs/heads/branch-1
Commit: e8b5e922cb1741ccc4954ef2ba0f97b5d6d39079
Parents: ba4cc63
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Aug 12 16:32:37 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 12 16:40:58 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/thrift/IncrementCoalescer.java |   8 +-
 .../hadoop/hbase/thrift/ThriftServerRunner.java | 135 ++++++++++++++-----
 2 files changed, 108 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e8b5e922/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index 13a2e50..e937f2d 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -264,8 +264,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
           if (counter == null) {
             continue;
           }
+          Table table = null;
           try {
-            Table table = handler.getTable(row.getTable());
+            table = handler.getTable(row.getTable());
             if (failures > 2) {
               throw new IOException("Auto-Fail rest of ICVs");
             }
@@ -278,8 +279,11 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
                 + Bytes.toStringBinary(row.getRowKey()) + ", "
                 + Bytes.toStringBinary(row.getFamily()) + ", "
                 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
+          } finally{
+            if(table != null){
+              table.close();
+            }
           }
-
         }
         return failures;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e8b5e922/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index a71bcf9..4b66a7f 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -638,15 +638,6 @@ public class ThriftServerRunner implements Runnable {
     private ThriftMetrics metrics = null;
 
     private final ConnectionCache connectionCache;
-
-    private static ThreadLocal<Map<String, Table>> threadLocalTables =
-        new ThreadLocal<Map<String, Table>>() {
-      @Override
-      protected Map<String, Table> initialValue() {
-        return new TreeMap<String, Table>();
-      }
-    };
-
     IncrementCoalescer coalescer = null;
 
     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
@@ -679,11 +670,7 @@ public class ThriftServerRunner implements Runnable {
     public Table getTable(final byte[] tableName) throws
         IOException {
       String table = Bytes.toString(tableName);
-      Map<String, Table> tables = threadLocalTables.get();
-      if (!tables.containsKey(table)) {
-        tables.put(table, (Table)connectionCache.getTable(table));
-      }
-      return tables.get(table);
+      return connectionCache.getTable(table);
     }
 
     public Table getTable(final ByteBuffer tableName) throws IOException {
@@ -879,8 +866,9 @@ public class ThriftServerRunner implements Runnable {
                               byte[] family,
                               byte[] qualifier,
                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (qualifier == null) {
@@ -893,6 +881,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -920,8 +910,10 @@ public class ThriftServerRunner implements Runnable {
      */
     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -935,6 +927,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -962,8 +956,10 @@ public class ThriftServerRunner implements Runnable {
     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -978,6 +974,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1010,8 +1008,10 @@ public class ThriftServerRunner implements Runnable {
     public List<TRowResult> getRowWithColumnsTs(
         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         if (columns == null) {
           Get get = new Get(getBytes(row));
           addAttributes(get, attributes);
@@ -1035,6 +1035,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1072,9 +1074,11 @@ public class ThriftServerRunner implements Runnable {
                                                  List<ByteBuffer> rows,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table= null;
       try {
         List<Get> gets = new ArrayList<Get>(rows.size());
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         if (metrics != null) {
           metrics.incNumRowKeysInBatchGet(rows.size());
         }
@@ -1100,6 +1104,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1117,8 +1123,9 @@ public class ThriftServerRunner implements Runnable {
                             ByteBuffer row,
                             ByteBuffer column,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row));
         addAttributes(delete, attributes);
         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
@@ -1132,6 +1139,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1146,14 +1155,17 @@ public class ThriftServerRunner implements Runnable {
     public void deleteAllRowTs(
         ByteBuffer tableName, ByteBuffer row, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row), timestamp);
         addAttributes(delete, attributes);
         table.delete(delete);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1260,6 +1272,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1331,6 +1345,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1348,7 +1364,7 @@ public class ThriftServerRunner implements Runnable {
     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
         byte [] family, byte [] qualifier, long amount)
         throws IOError, IllegalArgument, TException {
-      Table table;
+      Table table = null;
       try {
         table = getTable(tableName);
         return table.incrementColumnValue(
@@ -1356,6 +1372,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1405,8 +1423,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan();
         addAttributes(scan, attributes);
         if (tScan.isSetStartRow()) {
@@ -1446,6 +1466,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1453,8 +1475,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1471,6 +1495,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1479,8 +1505,10 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1497,6 +1525,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1506,8 +1536,10 @@ public class ThriftServerRunner implements Runnable {
                                      List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startAndPrefix));
         addAttributes(scan, attributes);
         Filter f = new WhileMatchFilter(
@@ -1527,6 +1559,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1534,8 +1568,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1553,6 +1589,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1561,8 +1599,10 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1581,17 +1621,21 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
     @Override
     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
         ByteBuffer tableName) throws IOError, TException {
+      
+      Table table = null;
       try {
         TreeMap<ByteBuffer, ColumnDescriptor> columns =
           new TreeMap<ByteBuffer, ColumnDescriptor>();
 
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         HTableDescriptor desc = table.getTableDescriptor();
 
         for (HColumnDescriptor e : desc.getFamilies()) {
@@ -1602,6 +1646,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1657,15 +1703,30 @@ public class ThriftServerRunner implements Runnable {
       }
     }
 
+    private void closeTable(Table table) throws IOError
+    {
+      try{
+        if(table != null){
+          table.close();
+        }
+      } catch (IOException e){
+        LOG.error(e.getMessage(), e);
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      }
+    }
+    
     private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
       Scan scan = new Scan(row);
       scan.setReversed(true);
       scan.addFamily(family);
       scan.setStartRow(row);
-
-      Table table = getTable(tableName);
+      Table table = getTable(tableName);      
       try (ResultScanner scanner = table.getScanner(scan)) {
         return scanner.next();
+      } finally{
+        if(table != null){
+          table.close();
+        }
       }
     }
 
@@ -1685,13 +1746,16 @@ public class ThriftServerRunner implements Runnable {
         return;
       }
 
+      Table table = null;
       try {
-        Table table = getTable(tincrement.getTable());
+        table = getTable(tincrement.getTable());
         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
         table.increment(inc);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1712,14 +1776,17 @@ public class ThriftServerRunner implements Runnable {
         throw new TException("Must supply a table and a row key; can't append");
       }
 
+      Table table = null;
       try {
-        Table table = getTable(tappend.getTable());
+        table = getTable(tappend.getTable());
         Append append = ThriftUtilities.appendFromThrift(tappend);
         Result result = table.append(append);
         return ThriftUtilities.cellFromHBase(result.rawCells());
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+          closeTable(table);
       }
     }
 
@@ -1755,6 +1822,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally {
+          closeTable(table);
       }
     }
   }


[4/6] hbase git commit: HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Posted by ap...@apache.org.
HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Conflicts:
	hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5627e002
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5627e002
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5627e002

Branch: refs/heads/branch-1.1
Commit: 5627e0020e59d3bf77d1a83b57037d9837c7b99f
Parents: aa64605
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Aug 12 16:32:37 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 12 16:41:13 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/thrift/IncrementCoalescer.java |   8 +-
 .../hadoop/hbase/thrift/ThriftServerRunner.java | 135 ++++++++++++++-----
 2 files changed, 108 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5627e002/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index bdbe445..ab1cbee 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -264,8 +264,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
           if (counter == null) {
             continue;
           }
+          Table table = null;
           try {
-            Table table = handler.getTable(row.getTable());
+            table = handler.getTable(row.getTable());
             if (failures > 2) {
               throw new IOException("Auto-Fail rest of ICVs");
             }
@@ -278,8 +279,11 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
                 + Bytes.toStringBinary(row.getRowKey()) + ", "
                 + Bytes.toStringBinary(row.getFamily()) + ", "
                 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
+          } finally{
+            if(table != null){
+              table.close();
+            }
           }
-
         }
         return failures;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5627e002/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index 9cfaf3f..109e874 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -639,15 +639,6 @@ public class ThriftServerRunner implements Runnable {
     private ThriftMetrics metrics = null;
 
     private final ConnectionCache connectionCache;
-
-    private static ThreadLocal<Map<String, Table>> threadLocalTables =
-        new ThreadLocal<Map<String, Table>>() {
-      @Override
-      protected Map<String, Table> initialValue() {
-        return new TreeMap<String, Table>();
-      }
-    };
-
     IncrementCoalescer coalescer = null;
 
     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
@@ -681,11 +672,7 @@ public class ThriftServerRunner implements Runnable {
     public Table getTable(final byte[] tableName) throws
         IOException {
       String table = Bytes.toString(tableName);
-      Map<String, Table> tables = threadLocalTables.get();
-      if (!tables.containsKey(table)) {
-        tables.put(table, (Table)connectionCache.getTable(table));
-      }
-      return tables.get(table);
+      return connectionCache.getTable(table);
     }
 
     public Table getTable(final ByteBuffer tableName) throws IOException {
@@ -882,8 +869,9 @@ public class ThriftServerRunner implements Runnable {
                               byte[] family,
                               byte[] qualifier,
                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (qualifier == null) {
@@ -896,6 +884,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -924,8 +914,10 @@ public class ThriftServerRunner implements Runnable {
      */
     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -939,6 +931,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -967,8 +961,10 @@ public class ThriftServerRunner implements Runnable {
     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -983,6 +979,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1015,8 +1013,10 @@ public class ThriftServerRunner implements Runnable {
     public List<TRowResult> getRowWithColumnsTs(
         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         if (columns == null) {
           Get get = new Get(getBytes(row));
           addAttributes(get, attributes);
@@ -1040,6 +1040,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1077,9 +1079,11 @@ public class ThriftServerRunner implements Runnable {
                                                  List<ByteBuffer> rows,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table= null;
       try {
         List<Get> gets = new ArrayList<Get>(rows.size());
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         if (metrics != null) {
           metrics.incNumRowKeysInBatchGet(rows.size());
         }
@@ -1105,6 +1109,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1122,8 +1128,9 @@ public class ThriftServerRunner implements Runnable {
                             ByteBuffer row,
                             ByteBuffer column,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row));
         addAttributes(delete, attributes);
         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
@@ -1137,6 +1144,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1151,14 +1160,17 @@ public class ThriftServerRunner implements Runnable {
     public void deleteAllRowTs(
         ByteBuffer tableName, ByteBuffer row, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row), timestamp);
         addAttributes(delete, attributes);
         table.delete(delete);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1265,6 +1277,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1336,6 +1350,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1354,7 +1370,7 @@ public class ThriftServerRunner implements Runnable {
     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
         byte [] family, byte [] qualifier, long amount)
         throws IOError, IllegalArgument, TException {
-      Table table;
+      Table table = null;
       try {
         table = getTable(tableName);
         return table.incrementColumnValue(
@@ -1362,6 +1378,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1411,8 +1429,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan();
         addAttributes(scan, attributes);
         if (tScan.isSetStartRow()) {
@@ -1452,6 +1472,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1459,8 +1481,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1477,6 +1501,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1485,8 +1511,10 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1503,6 +1531,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1512,8 +1542,10 @@ public class ThriftServerRunner implements Runnable {
                                      List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startAndPrefix));
         addAttributes(scan, attributes);
         Filter f = new WhileMatchFilter(
@@ -1533,6 +1565,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1540,8 +1574,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1559,6 +1595,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1567,8 +1605,10 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1587,17 +1627,21 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
     @Override
     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
         ByteBuffer tableName) throws IOError, TException {
+      
+      Table table = null;
       try {
         TreeMap<ByteBuffer, ColumnDescriptor> columns =
           new TreeMap<ByteBuffer, ColumnDescriptor>();
 
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         HTableDescriptor desc = table.getTableDescriptor();
 
         for (HColumnDescriptor e : desc.getFamilies()) {
@@ -1608,6 +1652,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1663,15 +1709,30 @@ public class ThriftServerRunner implements Runnable {
       }
     }
 
+    private void closeTable(Table table) throws IOError
+    {
+      try{
+        if(table != null){
+          table.close();
+        }
+      } catch (IOException e){
+        LOG.error(e.getMessage(), e);
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      }
+    }
+    
     private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
       Scan scan = new Scan(row);
       scan.setReversed(true);
       scan.addFamily(family);
       scan.setStartRow(row);
-
-      Table table = getTable(tableName);
+      Table table = getTable(tableName);      
       try (ResultScanner scanner = table.getScanner(scan)) {
         return scanner.next();
+      } finally{
+        if(table != null){
+          table.close();
+        }
       }
     }
 
@@ -1691,13 +1752,16 @@ public class ThriftServerRunner implements Runnable {
         return;
       }
 
+      Table table = null;
       try {
-        Table table = getTable(tincrement.getTable());
+        table = getTable(tincrement.getTable());
         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
         table.increment(inc);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1718,14 +1782,17 @@ public class ThriftServerRunner implements Runnable {
         throw new TException("Must supply a table and a row key; can't append");
       }
 
+      Table table = null;
       try {
-        Table table = getTable(tappend.getTable());
+        table = getTable(tappend.getTable());
         Append append = ThriftUtilities.appendFromThrift(tappend);
         Result result = table.append(append);
         return ThriftUtilities.cellFromHBase(result.rawCells());
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+          closeTable(table);
       }
     }
 
@@ -1761,6 +1828,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally {
+          closeTable(table);
       }
     }
   }


[6/6] hbase git commit: HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Posted by ap...@apache.org.
HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Conflicts:
	hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
	hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a705a748
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a705a748
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a705a748

Branch: refs/heads/0.98
Commit: a705a7484637c94c1d4c3555c3888cf3908611ce
Parents: 87729cc
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Aug 12 16:32:37 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 12 17:05:12 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/thrift/IncrementCoalescer.java |  11 +-
 .../hadoop/hbase/thrift/ThriftServerRunner.java | 176 ++++++++++++-------
 2 files changed, 122 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a705a748/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index bb45c88..78a5874 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
 import org.apache.hadoop.hbase.thrift.generated.TIncrement;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -163,7 +163,6 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
 
   protected final Log LOG = LogFactory.getLog(this.getClass().getName());
 
-  @SuppressWarnings("deprecation")
   public IncrementCoalescer(HBaseHandler hand) {
     this.handler = hand;
     LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
@@ -264,8 +263,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
           if (counter == null) {
             continue;
           }
+          HTableInterface table = null;
           try {
-            HTable table = handler.getTable(row.getTable());
+            table = handler.getTable(row.getTable());
             if (failures > 2) {
               throw new IOException("Auto-Fail rest of ICVs");
             }
@@ -278,8 +278,11 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
                 + Bytes.toStringBinary(row.getRowKey()) + ", "
                 + Bytes.toStringBinary(row.getFamily()) + ", "
                 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
+          } finally{
+            if(table != null){
+              table.close();
+            }
           }
-
         }
         return failures;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a705a748/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index e245309..a73ec40 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
 import org.apache.hadoop.hbase.client.Put;
@@ -116,6 +117,7 @@ import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportFactory;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -538,15 +540,6 @@ public class ThriftServerRunner implements Runnable {
     private ThriftMetrics metrics = null;
 
     private final ConnectionCache connectionCache;
-
-    private static ThreadLocal<Map<String, HTable>> threadLocalTables =
-        new ThreadLocal<Map<String, HTable>>() {
-      @Override
-      protected Map<String, HTable> initialValue() {
-        return new TreeMap<String, HTable>();
-      }
-    };
-
     IncrementCoalescer coalescer = null;
 
     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
@@ -577,17 +570,13 @@ public class ThriftServerRunner implements Runnable {
      * @throws IOException
      * @throws IOError
      */
-    public HTable getTable(final byte[] tableName) throws
+    public HTableInterface getTable(final byte[] tableName) throws
         IOException {
       String table = Bytes.toString(tableName);
-      Map<String, HTable> tables = threadLocalTables.get();
-      if (!tables.containsKey(table)) {
-        tables.put(table, (HTable)connectionCache.getTable(table));
-      }
-      return tables.get(table);
+      return connectionCache.getTable(table);
     }
 
-    public HTable getTable(final ByteBuffer tableName) throws IOException {
+    public HTableInterface getTable(final ByteBuffer tableName) throws IOException {
       return getTable(getBytes(tableName));
     }
 
@@ -726,14 +715,13 @@ public class ThriftServerRunner implements Runnable {
     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
     throws IOError {
       try {
-        HTable table;
+        HTableInterface table;
         try {
           table = getTable(tableName);
         } catch (TableNotFoundException ex) {
           return new ArrayList<TRegionInfo>();
         }
-        Map<HRegionInfo, ServerName> regionLocations =
-            table.getRegionLocations();
+        Map<HRegionInfo, ServerName> regionLocations = ((HTable)table).getRegionLocations();
         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
         for (Map.Entry<HRegionInfo, ServerName> entry :
             regionLocations.entrySet()) {
@@ -788,8 +776,9 @@ public class ThriftServerRunner implements Runnable {
                               byte[] family,
                               byte[] qualifier,
                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (qualifier == null) {
@@ -801,7 +790,9 @@ public class ThriftServerRunner implements Runnable {
         return ThriftUtilities.cellFromHBase(result.rawCells());
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -830,8 +821,9 @@ public class ThriftServerRunner implements Runnable {
      */
     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -844,7 +836,9 @@ public class ThriftServerRunner implements Runnable {
         return ThriftUtilities.cellFromHBase(result.rawCells());
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -873,8 +867,9 @@ public class ThriftServerRunner implements Runnable {
     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -888,7 +883,9 @@ public class ThriftServerRunner implements Runnable {
         return ThriftUtilities.cellFromHBase(result.rawCells());
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -921,8 +918,9 @@ public class ThriftServerRunner implements Runnable {
     public List<TRowResult> getRowWithColumnsTs(
         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         if (columns == null) {
           Get get = new Get(getBytes(row));
           addAttributes(get, attributes);
@@ -945,7 +943,9 @@ public class ThriftServerRunner implements Runnable {
         return ThriftUtilities.rowResultFromHBase(result);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -983,9 +983,10 @@ public class ThriftServerRunner implements Runnable {
                                                  List<ByteBuffer> rows,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      HTableInterface table = null;
       try {
         List<Get> gets = new ArrayList<Get>(rows.size());
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         if (metrics != null) {
           metrics.incNumRowKeysInBatchGet(rows.size());
         }
@@ -1010,7 +1011,9 @@ public class ThriftServerRunner implements Runnable {
         return ThriftUtilities.rowResultFromHBase(result);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1028,8 +1031,9 @@ public class ThriftServerRunner implements Runnable {
                             ByteBuffer row,
                             ByteBuffer column,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row));
         addAttributes(delete, attributes);
         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
@@ -1042,7 +1046,9 @@ public class ThriftServerRunner implements Runnable {
 
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1057,14 +1063,17 @@ public class ThriftServerRunner implements Runnable {
     public void deleteAllRowTs(
         ByteBuffer tableName, ByteBuffer row, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row), timestamp);
         addAttributes(delete, attributes);
         table.delete(delete);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1122,7 +1131,7 @@ public class ThriftServerRunner implements Runnable {
         List<Mutation> mutations, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, IllegalArgument {
-      HTable table = null;
+      HTableInterface table = null;
       try {
         table = getTable(tableName);
         Put put = new Put(getBytes(row), timestamp);
@@ -1166,7 +1175,9 @@ public class ThriftServerRunner implements Runnable {
         throw new IOError(e.getMessage());
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IllegalArgument(e.getMessage());
+        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1223,8 +1234,7 @@ public class ThriftServerRunner implements Runnable {
         if (!put.isEmpty())
           puts.add(put);
       }
-
-      HTable table = null;
+      HTableInterface table = null;
       try {
         table = getTable(tableName);
         if (!puts.isEmpty())
@@ -1237,7 +1247,9 @@ public class ThriftServerRunner implements Runnable {
         throw new IOError(e.getMessage());
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IllegalArgument(e.getMessage());
+        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1256,14 +1268,16 @@ public class ThriftServerRunner implements Runnable {
     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
         byte [] family, byte [] qualifier, long amount)
         throws IOError, IllegalArgument, TException {
-      HTable table;
+      HTableInterface table = null;
       try {
         table = getTable(tableName);
         return table.incrementColumnValue(
             getBytes(row), family, qualifier, amount);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1313,8 +1327,9 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan();
         addAttributes(scan, attributes);
         if (tScan.isSetStartRow()) {
@@ -1353,7 +1368,9 @@ public class ThriftServerRunner implements Runnable {
         return addScanner(table.getScanner(scan), tScan.sortColumns);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1361,8 +1378,9 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1378,7 +1396,9 @@ public class ThriftServerRunner implements Runnable {
         return addScanner(table.getScanner(scan), false);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1387,8 +1407,9 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1404,7 +1425,9 @@ public class ThriftServerRunner implements Runnable {
         return addScanner(table.getScanner(scan), false);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1414,8 +1437,9 @@ public class ThriftServerRunner implements Runnable {
                                      List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startAndPrefix));
         addAttributes(scan, attributes);
         Filter f = new WhileMatchFilter(
@@ -1434,7 +1458,9 @@ public class ThriftServerRunner implements Runnable {
         return addScanner(table.getScanner(scan), false);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1442,8 +1468,9 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1460,7 +1487,9 @@ public class ThriftServerRunner implements Runnable {
         return addScanner(table.getScanner(scan), false);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1469,8 +1498,9 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1488,18 +1518,21 @@ public class ThriftServerRunner implements Runnable {
         return addScanner(table.getScanner(scan), false);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
     @Override
     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
         ByteBuffer tableName) throws IOError, TException {
+      HTableInterface table = null;
       try {
         TreeMap<ByteBuffer, ColumnDescriptor> columns =
           new TreeMap<ByteBuffer, ColumnDescriptor>();
 
-        HTable table = getTable(tableName);
+        table = getTable(tableName);
         HTableDescriptor desc = table.getTableDescriptor();
 
         for (HColumnDescriptor e : desc.getFamilies()) {
@@ -1509,27 +1542,33 @@ public class ThriftServerRunner implements Runnable {
         return columns;
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
     @Override
     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
         ByteBuffer family) throws IOError {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(getBytes(tableName));
+        table = getTable(getBytes(tableName));
         Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
         return ThriftUtilities.cellFromHBase(result.rawCells());
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(e.getMessage());
+      } finally {
+        closeTable(table);
       }
     }
 
     @Override
     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
+      HTableInterface table = null;
       try {
-        HTable table = getTable(TableName.META_TABLE_NAME.getName());
+        table = getTable(TableName.META_TABLE_NAME.getName());
         byte[] row = getBytes(searchRow);
         Result startRowResult = table.getRowOrBefore(
           row, HConstants.CATALOG_FAMILY);
@@ -1562,7 +1601,21 @@ public class ThriftServerRunner implements Runnable {
         return region;
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
+      }
+    }
+
+    private void closeTable(HTableInterface table) throws IOError
+    {
+      try{
+        if(table != null){
+          table.close();
+        }
+      } catch (IOException e){
+        LOG.error(e.getMessage(), e);
+        throw new IOError(Throwables.getStackTraceAsString(e));
       }
     }
 
@@ -1582,13 +1635,16 @@ public class ThriftServerRunner implements Runnable {
         return;
       }
 
+      HTableInterface table = null;
       try {
-        HTable table = getTable(tincrement.getTable());
+        table = getTable(tincrement.getTable());
         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
         table.increment(inc);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
-        throw new IOError(e.getMessage());
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1604,8 +1660,6 @@ public class ThriftServerRunner implements Runnable {
     }
   }
 
-
-
   /**
    * Adds all the attributes into the Operation object
    */


[5/6] hbase git commit: HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Posted by ap...@apache.org.
HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Conflicts:
	hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a81d0a4a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a81d0a4a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a81d0a4a

Branch: refs/heads/branch-1.0
Commit: a81d0a4a7b9792681b12b5f7a7fc59aff70ae884
Parents: b99b567
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Aug 12 16:32:37 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 12 16:41:17 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/thrift/IncrementCoalescer.java |   8 +-
 .../hadoop/hbase/thrift/ThriftServerRunner.java | 135 ++++++++++++++-----
 2 files changed, 108 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a81d0a4a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index bdbe445..ab1cbee 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -264,8 +264,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
           if (counter == null) {
             continue;
           }
+          Table table = null;
           try {
-            Table table = handler.getTable(row.getTable());
+            table = handler.getTable(row.getTable());
             if (failures > 2) {
               throw new IOException("Auto-Fail rest of ICVs");
             }
@@ -278,8 +279,11 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
                 + Bytes.toStringBinary(row.getRowKey()) + ", "
                 + Bytes.toStringBinary(row.getFamily()) + ", "
                 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
+          } finally{
+            if(table != null){
+              table.close();
+            }
           }
-
         }
         return failures;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a81d0a4a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index 9cfaf3f..109e874 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -639,15 +639,6 @@ public class ThriftServerRunner implements Runnable {
     private ThriftMetrics metrics = null;
 
     private final ConnectionCache connectionCache;
-
-    private static ThreadLocal<Map<String, Table>> threadLocalTables =
-        new ThreadLocal<Map<String, Table>>() {
-      @Override
-      protected Map<String, Table> initialValue() {
-        return new TreeMap<String, Table>();
-      }
-    };
-
     IncrementCoalescer coalescer = null;
 
     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
@@ -681,11 +672,7 @@ public class ThriftServerRunner implements Runnable {
     public Table getTable(final byte[] tableName) throws
         IOException {
       String table = Bytes.toString(tableName);
-      Map<String, Table> tables = threadLocalTables.get();
-      if (!tables.containsKey(table)) {
-        tables.put(table, (Table)connectionCache.getTable(table));
-      }
-      return tables.get(table);
+      return connectionCache.getTable(table);
     }
 
     public Table getTable(final ByteBuffer tableName) throws IOException {
@@ -882,8 +869,9 @@ public class ThriftServerRunner implements Runnable {
                               byte[] family,
                               byte[] qualifier,
                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (qualifier == null) {
@@ -896,6 +884,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -924,8 +914,10 @@ public class ThriftServerRunner implements Runnable {
      */
     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -939,6 +931,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -967,8 +961,10 @@ public class ThriftServerRunner implements Runnable {
     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -983,6 +979,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1015,8 +1013,10 @@ public class ThriftServerRunner implements Runnable {
     public List<TRowResult> getRowWithColumnsTs(
         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         if (columns == null) {
           Get get = new Get(getBytes(row));
           addAttributes(get, attributes);
@@ -1040,6 +1040,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1077,9 +1079,11 @@ public class ThriftServerRunner implements Runnable {
                                                  List<ByteBuffer> rows,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table= null;
       try {
         List<Get> gets = new ArrayList<Get>(rows.size());
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         if (metrics != null) {
           metrics.incNumRowKeysInBatchGet(rows.size());
         }
@@ -1105,6 +1109,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1122,8 +1128,9 @@ public class ThriftServerRunner implements Runnable {
                             ByteBuffer row,
                             ByteBuffer column,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row));
         addAttributes(delete, attributes);
         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
@@ -1137,6 +1144,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1151,14 +1160,17 @@ public class ThriftServerRunner implements Runnable {
     public void deleteAllRowTs(
         ByteBuffer tableName, ByteBuffer row, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row), timestamp);
         addAttributes(delete, attributes);
         table.delete(delete);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1265,6 +1277,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1336,6 +1350,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1354,7 +1370,7 @@ public class ThriftServerRunner implements Runnable {
     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
         byte [] family, byte [] qualifier, long amount)
         throws IOError, IllegalArgument, TException {
-      Table table;
+      Table table = null;
       try {
         table = getTable(tableName);
         return table.incrementColumnValue(
@@ -1362,6 +1378,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1411,8 +1429,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan();
         addAttributes(scan, attributes);
         if (tScan.isSetStartRow()) {
@@ -1452,6 +1472,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1459,8 +1481,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1477,6 +1501,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1485,8 +1511,10 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1503,6 +1531,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1512,8 +1542,10 @@ public class ThriftServerRunner implements Runnable {
                                      List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startAndPrefix));
         addAttributes(scan, attributes);
         Filter f = new WhileMatchFilter(
@@ -1533,6 +1565,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1540,8 +1574,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1559,6 +1595,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1567,8 +1605,10 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1587,17 +1627,21 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
     @Override
     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
         ByteBuffer tableName) throws IOError, TException {
+      
+      Table table = null;
       try {
         TreeMap<ByteBuffer, ColumnDescriptor> columns =
           new TreeMap<ByteBuffer, ColumnDescriptor>();
 
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         HTableDescriptor desc = table.getTableDescriptor();
 
         for (HColumnDescriptor e : desc.getFamilies()) {
@@ -1608,6 +1652,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1663,15 +1709,30 @@ public class ThriftServerRunner implements Runnable {
       }
     }
 
+    private void closeTable(Table table) throws IOError
+    {
+      try{
+        if(table != null){
+          table.close();
+        }
+      } catch (IOException e){
+        LOG.error(e.getMessage(), e);
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      }
+    }
+    
     private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
       Scan scan = new Scan(row);
       scan.setReversed(true);
       scan.addFamily(family);
       scan.setStartRow(row);
-
-      Table table = getTable(tableName);
+      Table table = getTable(tableName);      
       try (ResultScanner scanner = table.getScanner(scan)) {
         return scanner.next();
+      } finally{
+        if(table != null){
+          table.close();
+        }
       }
     }
 
@@ -1691,13 +1752,16 @@ public class ThriftServerRunner implements Runnable {
         return;
       }
 
+      Table table = null;
       try {
-        Table table = getTable(tincrement.getTable());
+        table = getTable(tincrement.getTable());
         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
         table.increment(inc);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1718,14 +1782,17 @@ public class ThriftServerRunner implements Runnable {
         throw new TException("Must supply a table and a row key; can't append");
       }
 
+      Table table = null;
       try {
-        Table table = getTable(tappend.getTable());
+        table = getTable(tappend.getTable());
         Append append = ThriftUtilities.appendFromThrift(tappend);
         Result result = table.append(append);
         return ThriftUtilities.cellFromHBase(result.rawCells());
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+          closeTable(table);
       }
     }
 
@@ -1761,6 +1828,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally {
+          closeTable(table);
       }
     }
   }


[3/6] hbase git commit: HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Posted by ap...@apache.org.
HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

Conflicts:
	hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/93d6fbe9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/93d6fbe9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/93d6fbe9

Branch: refs/heads/branch-1.2
Commit: 93d6fbe9271e85ce144b47cdaeb4bf63162dde37
Parents: 2a5b5c7
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Aug 12 16:32:37 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 12 16:41:09 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/thrift/IncrementCoalescer.java |   8 +-
 .../hadoop/hbase/thrift/ThriftServerRunner.java | 135 ++++++++++++++-----
 2 files changed, 108 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/93d6fbe9/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index 13a2e50..e937f2d 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -264,8 +264,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
           if (counter == null) {
             continue;
           }
+          Table table = null;
           try {
-            Table table = handler.getTable(row.getTable());
+            table = handler.getTable(row.getTable());
             if (failures > 2) {
               throw new IOException("Auto-Fail rest of ICVs");
             }
@@ -278,8 +279,11 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
                 + Bytes.toStringBinary(row.getRowKey()) + ", "
                 + Bytes.toStringBinary(row.getFamily()) + ", "
                 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
+          } finally{
+            if(table != null){
+              table.close();
+            }
           }
-
         }
         return failures;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/93d6fbe9/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index a71bcf9..4b66a7f 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -638,15 +638,6 @@ public class ThriftServerRunner implements Runnable {
     private ThriftMetrics metrics = null;
 
     private final ConnectionCache connectionCache;
-
-    private static ThreadLocal<Map<String, Table>> threadLocalTables =
-        new ThreadLocal<Map<String, Table>>() {
-      @Override
-      protected Map<String, Table> initialValue() {
-        return new TreeMap<String, Table>();
-      }
-    };
-
     IncrementCoalescer coalescer = null;
 
     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
@@ -679,11 +670,7 @@ public class ThriftServerRunner implements Runnable {
     public Table getTable(final byte[] tableName) throws
         IOException {
       String table = Bytes.toString(tableName);
-      Map<String, Table> tables = threadLocalTables.get();
-      if (!tables.containsKey(table)) {
-        tables.put(table, (Table)connectionCache.getTable(table));
-      }
-      return tables.get(table);
+      return connectionCache.getTable(table);
     }
 
     public Table getTable(final ByteBuffer tableName) throws IOException {
@@ -879,8 +866,9 @@ public class ThriftServerRunner implements Runnable {
                               byte[] family,
                               byte[] qualifier,
                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (qualifier == null) {
@@ -893,6 +881,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -920,8 +910,10 @@ public class ThriftServerRunner implements Runnable {
      */
     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -935,6 +927,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -962,8 +956,10 @@ public class ThriftServerRunner implements Runnable {
     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Get get = new Get(getBytes(row));
         addAttributes(get, attributes);
         if (null == qualifier) {
@@ -978,6 +974,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1010,8 +1008,10 @@ public class ThriftServerRunner implements Runnable {
     public List<TRowResult> getRowWithColumnsTs(
         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         if (columns == null) {
           Get get = new Get(getBytes(row));
           addAttributes(get, attributes);
@@ -1035,6 +1035,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1072,9 +1074,11 @@ public class ThriftServerRunner implements Runnable {
                                                  List<ByteBuffer> rows,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table= null;
       try {
         List<Get> gets = new ArrayList<Get>(rows.size());
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         if (metrics != null) {
           metrics.incNumRowKeysInBatchGet(rows.size());
         }
@@ -1100,6 +1104,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1117,8 +1123,9 @@ public class ThriftServerRunner implements Runnable {
                             ByteBuffer row,
                             ByteBuffer column,
         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row));
         addAttributes(delete, attributes);
         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
@@ -1132,6 +1139,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1146,14 +1155,17 @@ public class ThriftServerRunner implements Runnable {
     public void deleteAllRowTs(
         ByteBuffer tableName, ByteBuffer row, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row), timestamp);
         addAttributes(delete, attributes);
         table.delete(delete);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1260,6 +1272,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1331,6 +1345,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1348,7 +1364,7 @@ public class ThriftServerRunner implements Runnable {
     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
         byte [] family, byte [] qualifier, long amount)
         throws IOError, IllegalArgument, TException {
-      Table table;
+      Table table = null;
       try {
         table = getTable(tableName);
         return table.incrementColumnValue(
@@ -1356,6 +1372,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1405,8 +1423,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan();
         addAttributes(scan, attributes);
         if (tScan.isSetStartRow()) {
@@ -1446,6 +1466,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1453,8 +1475,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1471,6 +1495,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1479,8 +1505,10 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         if(columns != null && columns.size() != 0) {
@@ -1497,6 +1525,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1506,8 +1536,10 @@ public class ThriftServerRunner implements Runnable {
                                      List<ByteBuffer> columns,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startAndPrefix));
         addAttributes(scan, attributes);
         Filter f = new WhileMatchFilter(
@@ -1527,6 +1559,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1534,8 +1568,10 @@ public class ThriftServerRunner implements Runnable {
     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
         List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1553,6 +1589,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1561,8 +1599,10 @@ public class ThriftServerRunner implements Runnable {
         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes)
         throws IOError, TException {
+      
+      Table table = null;
       try {
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         addAttributes(scan, attributes);
         scan.setTimeRange(0, timestamp);
@@ -1581,17 +1621,21 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
     @Override
     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
         ByteBuffer tableName) throws IOError, TException {
+      
+      Table table = null;
       try {
         TreeMap<ByteBuffer, ColumnDescriptor> columns =
           new TreeMap<ByteBuffer, ColumnDescriptor>();
 
-        Table table = getTable(tableName);
+        table = getTable(tableName);
         HTableDescriptor desc = table.getTableDescriptor();
 
         for (HColumnDescriptor e : desc.getFamilies()) {
@@ -1602,6 +1646,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally {
+        closeTable(table);
       }
     }
 
@@ -1657,15 +1703,30 @@ public class ThriftServerRunner implements Runnable {
       }
     }
 
+    private void closeTable(Table table) throws IOError
+    {
+      try{
+        if(table != null){
+          table.close();
+        }
+      } catch (IOException e){
+        LOG.error(e.getMessage(), e);
+        throw new IOError(Throwables.getStackTraceAsString(e));
+      }
+    }
+    
     private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
       Scan scan = new Scan(row);
       scan.setReversed(true);
       scan.addFamily(family);
       scan.setStartRow(row);
-
-      Table table = getTable(tableName);
+      Table table = getTable(tableName);      
       try (ResultScanner scanner = table.getScanner(scan)) {
         return scanner.next();
+      } finally{
+        if(table != null){
+          table.close();
+        }
       }
     }
 
@@ -1685,13 +1746,16 @@ public class ThriftServerRunner implements Runnable {
         return;
       }
 
+      Table table = null;
       try {
-        Table table = getTable(tincrement.getTable());
+        table = getTable(tincrement.getTable());
         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
         table.increment(inc);
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+        closeTable(table);
       }
     }
 
@@ -1712,14 +1776,17 @@ public class ThriftServerRunner implements Runnable {
         throw new TException("Must supply a table and a row key; can't append");
       }
 
+      Table table = null;
       try {
-        Table table = getTable(tappend.getTable());
+        table = getTable(tappend.getTable());
         Append append = ThriftUtilities.appendFromThrift(tappend);
         Result result = table.append(append);
         return ThriftUtilities.cellFromHBase(result.rawCells());
       } catch (IOException e) {
         LOG.warn(e.getMessage(), e);
         throw new IOError(Throwables.getStackTraceAsString(e));
+      } finally{
+          closeTable(table);
       }
     }
 
@@ -1755,6 +1822,8 @@ public class ThriftServerRunner implements Runnable {
       } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage(), e);
         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+      } finally {
+          closeTable(table);
       }
     }
   }