You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/08/13 02:39:54 UTC
[5/6] hbase git commit: HBASE-14196 Thrift server idle connection
timeout issue (Vladimir Rodionov)
HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)
Conflicts:
hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a81d0a4a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a81d0a4a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a81d0a4a
Branch: refs/heads/branch-1.0
Commit: a81d0a4a7b9792681b12b5f7a7fc59aff70ae884
Parents: b99b567
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Aug 12 16:32:37 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 12 16:41:17 2015 -0700
----------------------------------------------------------------------
.../hadoop/hbase/thrift/IncrementCoalescer.java | 8 +-
.../hadoop/hbase/thrift/ThriftServerRunner.java | 135 ++++++++++++++-----
2 files changed, 108 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a81d0a4a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index bdbe445..ab1cbee 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -264,8 +264,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
if (counter == null) {
continue;
}
+ Table table = null;
try {
- Table table = handler.getTable(row.getTable());
+ table = handler.getTable(row.getTable());
if (failures > 2) {
throw new IOException("Auto-Fail rest of ICVs");
}
@@ -278,8 +279,11 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
+ Bytes.toStringBinary(row.getRowKey()) + ", "
+ Bytes.toStringBinary(row.getFamily()) + ", "
+ Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
+ } finally{
+ if(table != null){
+ table.close();
+ }
}
-
}
return failures;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a81d0a4a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index 9cfaf3f..109e874 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -639,15 +639,6 @@ public class ThriftServerRunner implements Runnable {
private ThriftMetrics metrics = null;
private final ConnectionCache connectionCache;
-
- private static ThreadLocal<Map<String, Table>> threadLocalTables =
- new ThreadLocal<Map<String, Table>>() {
- @Override
- protected Map<String, Table> initialValue() {
- return new TreeMap<String, Table>();
- }
- };
-
IncrementCoalescer coalescer = null;
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
@@ -681,11 +672,7 @@ public class ThriftServerRunner implements Runnable {
public Table getTable(final byte[] tableName) throws
IOException {
String table = Bytes.toString(tableName);
- Map<String, Table> tables = threadLocalTables.get();
- if (!tables.containsKey(table)) {
- tables.put(table, (Table)connectionCache.getTable(table));
- }
- return tables.get(table);
+ return connectionCache.getTable(table);
}
public Table getTable(final ByteBuffer tableName) throws IOException {
@@ -882,8 +869,9 @@ public class ThriftServerRunner implements Runnable {
byte[] family,
byte[] qualifier,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Get get = new Get(getBytes(row));
addAttributes(get, attributes);
if (qualifier == null) {
@@ -896,6 +884,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
@@ -924,8 +914,10 @@ public class ThriftServerRunner implements Runnable {
*/
public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Get get = new Get(getBytes(row));
addAttributes(get, attributes);
if (null == qualifier) {
@@ -939,6 +931,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -967,8 +961,10 @@ public class ThriftServerRunner implements Runnable {
protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
throws IOError {
+
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Get get = new Get(getBytes(row));
addAttributes(get, attributes);
if (null == qualifier) {
@@ -983,6 +979,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1015,8 +1013,10 @@ public class ThriftServerRunner implements Runnable {
public List<TRowResult> getRowWithColumnsTs(
ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
if (columns == null) {
Get get = new Get(getBytes(row));
addAttributes(get, attributes);
@@ -1040,6 +1040,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1077,9 +1079,11 @@ public class ThriftServerRunner implements Runnable {
List<ByteBuffer> rows,
List<ByteBuffer> columns, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+
+ Table table= null;
try {
List<Get> gets = new ArrayList<Get>(rows.size());
- Table table = getTable(tableName);
+ table = getTable(tableName);
if (metrics != null) {
metrics.incNumRowKeysInBatchGet(rows.size());
}
@@ -1105,6 +1109,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1122,8 +1128,9 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer row,
ByteBuffer column,
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Delete delete = new Delete(getBytes(row));
addAttributes(delete, attributes);
byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
@@ -1137,6 +1144,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
@@ -1151,14 +1160,17 @@ public class ThriftServerRunner implements Runnable {
public void deleteAllRowTs(
ByteBuffer tableName, ByteBuffer row, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Delete delete = new Delete(getBytes(row), timestamp);
addAttributes(delete, attributes);
table.delete(delete);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
@@ -1265,6 +1277,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IllegalArgumentException e) {
LOG.warn(e.getMessage(), e);
throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1336,6 +1350,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IllegalArgumentException e) {
LOG.warn(e.getMessage(), e);
throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1354,7 +1370,7 @@ public class ThriftServerRunner implements Runnable {
protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
byte [] family, byte [] qualifier, long amount)
throws IOError, IllegalArgument, TException {
- Table table;
+ Table table = null;
try {
table = getTable(tableName);
return table.incrementColumnValue(
@@ -1362,6 +1378,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
@@ -1411,8 +1429,10 @@ public class ThriftServerRunner implements Runnable {
public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError {
+
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan();
addAttributes(scan, attributes);
if (tScan.isSetStartRow()) {
@@ -1452,6 +1472,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1459,8 +1481,10 @@ public class ThriftServerRunner implements Runnable {
public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow));
addAttributes(scan, attributes);
if(columns != null && columns.size() != 0) {
@@ -1477,6 +1501,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1485,8 +1511,10 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer stopRow, List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
+
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
addAttributes(scan, attributes);
if(columns != null && columns.size() != 0) {
@@ -1503,6 +1531,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1512,8 +1542,10 @@ public class ThriftServerRunner implements Runnable {
List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
+
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan(getBytes(startAndPrefix));
addAttributes(scan, attributes);
Filter f = new WhileMatchFilter(
@@ -1533,6 +1565,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1540,8 +1574,10 @@ public class ThriftServerRunner implements Runnable {
public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
List<ByteBuffer> columns, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
+
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow));
addAttributes(scan, attributes);
scan.setTimeRange(0, timestamp);
@@ -1559,6 +1595,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1567,8 +1605,10 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
+
+ Table table = null;
try {
- Table table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
addAttributes(scan, attributes);
scan.setTimeRange(0, timestamp);
@@ -1587,17 +1627,21 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@Override
public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
ByteBuffer tableName) throws IOError, TException {
+
+ Table table = null;
try {
TreeMap<ByteBuffer, ColumnDescriptor> columns =
new TreeMap<ByteBuffer, ColumnDescriptor>();
- Table table = getTable(tableName);
+ table = getTable(tableName);
HTableDescriptor desc = table.getTableDescriptor();
for (HColumnDescriptor e : desc.getFamilies()) {
@@ -1608,6 +1652,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
@@ -1663,15 +1709,30 @@ public class ThriftServerRunner implements Runnable {
}
}
+ private void closeTable(Table table) throws IOError
+ {
+ try{
+ if(table != null){
+ table.close();
+ }
+ } catch (IOException e){
+ LOG.error(e.getMessage(), e);
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ }
+ }
+
private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
Scan scan = new Scan(row);
scan.setReversed(true);
scan.addFamily(family);
scan.setStartRow(row);
-
- Table table = getTable(tableName);
+ Table table = getTable(tableName);
try (ResultScanner scanner = table.getScanner(scan)) {
return scanner.next();
+ } finally{
+ if(table != null){
+ table.close();
+ }
}
}
@@ -1691,13 +1752,16 @@ public class ThriftServerRunner implements Runnable {
return;
}
+ Table table = null;
try {
- Table table = getTable(tincrement.getTable());
+ table = getTable(tincrement.getTable());
Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
table.increment(inc);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1718,14 +1782,17 @@ public class ThriftServerRunner implements Runnable {
throw new TException("Must supply a table and a row key; can't append");
}
+ Table table = null;
try {
- Table table = getTable(tappend.getTable());
+ table = getTable(tappend.getTable());
Append append = ThriftUtilities.appendFromThrift(tappend);
Result result = table.append(append);
return ThriftUtilities.cellFromHBase(result.rawCells());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1761,6 +1828,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IllegalArgumentException e) {
LOG.warn(e.getMessage(), e);
throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
}