You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/08/13 02:39:55 UTC
[6/6] hbase git commit: HBASE-14196 Thrift server idle connection
timeout issue (Vladimir Rodionov)
HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)
Conflicts:
hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a705a748
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a705a748
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a705a748
Branch: refs/heads/0.98
Commit: a705a7484637c94c1d4c3555c3888cf3908611ce
Parents: 87729cc
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Aug 12 16:32:37 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 12 17:05:12 2015 -0700
----------------------------------------------------------------------
.../hadoop/hbase/thrift/IncrementCoalescer.java | 11 +-
.../hadoop/hbase/thrift/ThriftServerRunner.java | 176 ++++++++++++-------
2 files changed, 122 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a705a748/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index bb45c88..78a5874 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.util.Bytes;
@@ -163,7 +163,6 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
protected final Log LOG = LogFactory.getLog(this.getClass().getName());
- @SuppressWarnings("deprecation")
public IncrementCoalescer(HBaseHandler hand) {
this.handler = hand;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
@@ -264,8 +263,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
if (counter == null) {
continue;
}
+ HTableInterface table = null;
try {
- HTable table = handler.getTable(row.getTable());
+ table = handler.getTable(row.getTable());
if (failures > 2) {
throw new IOException("Auto-Fail rest of ICVs");
}
@@ -278,8 +278,11 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
+ Bytes.toStringBinary(row.getRowKey()) + ", "
+ Bytes.toStringBinary(row.getFamily()) + ", "
+ Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
+ } finally{
+ if(table != null){
+ table.close();
+ }
}
-
}
return failures;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a705a748/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index e245309..a73ec40 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
@@ -116,6 +117,7 @@ import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportFactory;
import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -538,15 +540,6 @@ public class ThriftServerRunner implements Runnable {
private ThriftMetrics metrics = null;
private final ConnectionCache connectionCache;
-
- private static ThreadLocal<Map<String, HTable>> threadLocalTables =
- new ThreadLocal<Map<String, HTable>>() {
- @Override
- protected Map<String, HTable> initialValue() {
- return new TreeMap<String, HTable>();
- }
- };
-
IncrementCoalescer coalescer = null;
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
@@ -577,17 +570,13 @@ public class ThriftServerRunner implements Runnable {
* @throws IOException
* @throws IOError
*/
- public HTable getTable(final byte[] tableName) throws
+ public HTableInterface getTable(final byte[] tableName) throws
IOException {
String table = Bytes.toString(tableName);
- Map<String, HTable> tables = threadLocalTables.get();
- if (!tables.containsKey(table)) {
- tables.put(table, (HTable)connectionCache.getTable(table));
- }
- return tables.get(table);
+ return connectionCache.getTable(table);
}
- public HTable getTable(final ByteBuffer tableName) throws IOException {
+ public HTableInterface getTable(final ByteBuffer tableName) throws IOException {
return getTable(getBytes(tableName));
}
@@ -726,14 +715,13 @@ public class ThriftServerRunner implements Runnable {
public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
throws IOError {
try {
- HTable table;
+ HTableInterface table;
try {
table = getTable(tableName);
} catch (TableNotFoundException ex) {
return new ArrayList<TRegionInfo>();
}
- Map<HRegionInfo, ServerName> regionLocations =
- table.getRegionLocations();
+ Map<HRegionInfo, ServerName> regionLocations = ((HTable)table).getRegionLocations();
List<TRegionInfo> results = new ArrayList<TRegionInfo>();
for (Map.Entry<HRegionInfo, ServerName> entry :
regionLocations.entrySet()) {
@@ -788,8 +776,9 @@ public class ThriftServerRunner implements Runnable {
byte[] family,
byte[] qualifier,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Get get = new Get(getBytes(row));
addAttributes(get, attributes);
if (qualifier == null) {
@@ -801,7 +790,9 @@ public class ThriftServerRunner implements Runnable {
return ThriftUtilities.cellFromHBase(result.rawCells());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
@@ -830,8 +821,9 @@ public class ThriftServerRunner implements Runnable {
*/
public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Get get = new Get(getBytes(row));
addAttributes(get, attributes);
if (null == qualifier) {
@@ -844,7 +836,9 @@ public class ThriftServerRunner implements Runnable {
return ThriftUtilities.cellFromHBase(result.rawCells());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -873,8 +867,9 @@ public class ThriftServerRunner implements Runnable {
protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
throws IOError {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Get get = new Get(getBytes(row));
addAttributes(get, attributes);
if (null == qualifier) {
@@ -888,7 +883,9 @@ public class ThriftServerRunner implements Runnable {
return ThriftUtilities.cellFromHBase(result.rawCells());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -921,8 +918,9 @@ public class ThriftServerRunner implements Runnable {
public List<TRowResult> getRowWithColumnsTs(
ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
if (columns == null) {
Get get = new Get(getBytes(row));
addAttributes(get, attributes);
@@ -945,7 +943,9 @@ public class ThriftServerRunner implements Runnable {
return ThriftUtilities.rowResultFromHBase(result);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -983,9 +983,10 @@ public class ThriftServerRunner implements Runnable {
List<ByteBuffer> rows,
List<ByteBuffer> columns, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ HTableInterface table = null;
try {
List<Get> gets = new ArrayList<Get>(rows.size());
- HTable table = getTable(tableName);
+ table = getTable(tableName);
if (metrics != null) {
metrics.incNumRowKeysInBatchGet(rows.size());
}
@@ -1010,7 +1011,9 @@ public class ThriftServerRunner implements Runnable {
return ThriftUtilities.rowResultFromHBase(result);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1028,8 +1031,9 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer row,
ByteBuffer column,
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Delete delete = new Delete(getBytes(row));
addAttributes(delete, attributes);
byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
@@ -1042,7 +1046,9 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
@@ -1057,14 +1063,17 @@ public class ThriftServerRunner implements Runnable {
public void deleteAllRowTs(
ByteBuffer tableName, ByteBuffer row, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Delete delete = new Delete(getBytes(row), timestamp);
addAttributes(delete, attributes);
table.delete(delete);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
@@ -1122,7 +1131,7 @@ public class ThriftServerRunner implements Runnable {
List<Mutation> mutations, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, IllegalArgument {
- HTable table = null;
+ HTableInterface table = null;
try {
table = getTable(tableName);
Put put = new Put(getBytes(row), timestamp);
@@ -1166,7 +1175,9 @@ public class ThriftServerRunner implements Runnable {
throw new IOError(e.getMessage());
} catch (IllegalArgumentException e) {
LOG.warn(e.getMessage(), e);
- throw new IllegalArgument(e.getMessage());
+ throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1223,8 +1234,7 @@ public class ThriftServerRunner implements Runnable {
if (!put.isEmpty())
puts.add(put);
}
-
- HTable table = null;
+ HTableInterface table = null;
try {
table = getTable(tableName);
if (!puts.isEmpty())
@@ -1237,7 +1247,9 @@ public class ThriftServerRunner implements Runnable {
throw new IOError(e.getMessage());
} catch (IllegalArgumentException e) {
LOG.warn(e.getMessage(), e);
- throw new IllegalArgument(e.getMessage());
+ throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1256,14 +1268,16 @@ public class ThriftServerRunner implements Runnable {
protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
byte [] family, byte [] qualifier, long amount)
throws IOError, IllegalArgument, TException {
- HTable table;
+ HTableInterface table = null;
try {
table = getTable(tableName);
return table.incrementColumnValue(
getBytes(row), family, qualifier, amount);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
@@ -1313,8 +1327,9 @@ public class ThriftServerRunner implements Runnable {
public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan();
addAttributes(scan, attributes);
if (tScan.isSetStartRow()) {
@@ -1353,7 +1368,9 @@ public class ThriftServerRunner implements Runnable {
return addScanner(table.getScanner(scan), tScan.sortColumns);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1361,8 +1378,9 @@ public class ThriftServerRunner implements Runnable {
public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow));
addAttributes(scan, attributes);
if(columns != null && columns.size() != 0) {
@@ -1378,7 +1396,9 @@ public class ThriftServerRunner implements Runnable {
return addScanner(table.getScanner(scan), false);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1387,8 +1407,9 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer stopRow, List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
addAttributes(scan, attributes);
if(columns != null && columns.size() != 0) {
@@ -1404,7 +1425,9 @@ public class ThriftServerRunner implements Runnable {
return addScanner(table.getScanner(scan), false);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1414,8 +1437,9 @@ public class ThriftServerRunner implements Runnable {
List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan(getBytes(startAndPrefix));
addAttributes(scan, attributes);
Filter f = new WhileMatchFilter(
@@ -1434,7 +1458,9 @@ public class ThriftServerRunner implements Runnable {
return addScanner(table.getScanner(scan), false);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1442,8 +1468,9 @@ public class ThriftServerRunner implements Runnable {
public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
List<ByteBuffer> columns, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow));
addAttributes(scan, attributes);
scan.setTimeRange(0, timestamp);
@@ -1460,7 +1487,9 @@ public class ThriftServerRunner implements Runnable {
return addScanner(table.getScanner(scan), false);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1469,8 +1498,9 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
+ HTableInterface table = null;
try {
- HTable table = getTable(tableName);
+ table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
addAttributes(scan, attributes);
scan.setTimeRange(0, timestamp);
@@ -1488,18 +1518,21 @@ public class ThriftServerRunner implements Runnable {
return addScanner(table.getScanner(scan), false);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@Override
public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
ByteBuffer tableName) throws IOError, TException {
+ HTableInterface table = null;
try {
TreeMap<ByteBuffer, ColumnDescriptor> columns =
new TreeMap<ByteBuffer, ColumnDescriptor>();
- HTable table = getTable(tableName);
+ table = getTable(tableName);
HTableDescriptor desc = table.getTableDescriptor();
for (HColumnDescriptor e : desc.getFamilies()) {
@@ -1509,27 +1542,33 @@ public class ThriftServerRunner implements Runnable {
return columns;
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
}
}
@Override
public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
ByteBuffer family) throws IOError {
+ HTableInterface table = null;
try {
- HTable table = getTable(getBytes(tableName));
+ table = getTable(getBytes(tableName));
Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
return ThriftUtilities.cellFromHBase(result.rawCells());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(e.getMessage());
+ } finally {
+ closeTable(table);
}
}
@Override
public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
+ HTableInterface table = null;
try {
- HTable table = getTable(TableName.META_TABLE_NAME.getName());
+ table = getTable(TableName.META_TABLE_NAME.getName());
byte[] row = getBytes(searchRow);
Result startRowResult = table.getRowOrBefore(
row, HConstants.CATALOG_FAMILY);
@@ -1562,7 +1601,21 @@ public class ThriftServerRunner implements Runnable {
return region;
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
+ }
+ }
+
+ private void closeTable(HTableInterface table) throws IOError
+ {
+ try{
+ if(table != null){
+ table.close();
+ }
+ } catch (IOException e){
+ LOG.error(e.getMessage(), e);
+ throw new IOError(Throwables.getStackTraceAsString(e));
}
}
@@ -1582,13 +1635,16 @@ public class ThriftServerRunner implements Runnable {
return;
}
+ HTableInterface table = null;
try {
- HTable table = getTable(tincrement.getTable());
+ table = getTable(tincrement.getTable());
Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
table.increment(inc);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
- throw new IOError(e.getMessage());
+ throw new IOError(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
}
}
@@ -1604,8 +1660,6 @@ public class ThriftServerRunner implements Runnable {
}
}
-
-
/**
* Adds all the attributes into the Operation object
*/