You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2015/09/30 04:48:09 UTC
hbase git commit: HBASE-14394 Properly close the connection after
reading records from table: addendum
Repository: hbase
Updated Branches:
refs/heads/branch-1.0 260ddae7b -> 85eec3e20
HBASE-14394 Properly close the connection after reading records from table: addendum
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/85eec3e2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/85eec3e2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/85eec3e2
Branch: refs/heads/branch-1.0
Commit: 85eec3e20303b73cce899b068a8cf6248e76da3f
Parents: 260ddae
Author: Jerry He <je...@apache.org>
Authored: Tue Sep 29 19:48:02 2015 -0700
Committer: Jerry He <je...@apache.org>
Committed: Tue Sep 29 19:48:02 2015 -0700
----------------------------------------------------------------------
.../mapreduce/MultiTableInputFormatBase.java | 53 ++++++++++++++++----
.../hbase/mapreduce/TableRecordReader.java | 9 +---
2 files changed, 45 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/85eec3e2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
index 0340f7a..cd5fc2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
@@ -89,29 +88,65 @@ public abstract class MultiTableInputFormatBase extends
+ " previous error. Please look at the previous logs lines from"
+ " the task's full log for more details.");
}
- Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
+ final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
Table table = connection.getTable(tSplit.getTable());
- TableRecordReader trr = this.tableRecordReader;
+ if (this.tableRecordReader == null) {
+ this.tableRecordReader = new TableRecordReader();
+ }
+ final TableRecordReader trr = this.tableRecordReader;
try {
- // if no table record reader was provided use default
- if (trr == null) {
- trr = new TableRecordReader();
- }
Scan sc = tSplit.getScan();
sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc);
trr.setTable(table);
- trr.setConnection(connection);
+ return new RecordReader<ImmutableBytesWritable, Result>() {
+
+ @Override
+ public void close() throws IOException {
+ trr.close();
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ @Override
+ public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
+ return trr.getCurrentKey();
+ }
+
+ @Override
+ public Result getCurrentValue() throws IOException, InterruptedException {
+ return trr.getCurrentValue();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return trr.getProgress();
+ }
+
+ @Override
+ public void initialize(InputSplit inputsplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ trr.initialize(inputsplit, context);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return trr.nextKeyValue();
+ }
+ };
} catch (IOException ioe) {
// If there is an exception make sure that all
// resources are closed and released.
trr.close();
+ if (connection != null) {
+ connection.close();
+ }
throw ioe;
}
- return trr;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/85eec3e2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
index 9ff90e7..21dc213 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@@ -41,7 +40,6 @@ public class TableRecordReader
extends RecordReader<ImmutableBytesWritable, Result> {
private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
- private Connection connection = null;
/**
* Restart from survivable exceptions by creating a new scanner.
@@ -87,10 +85,8 @@ extends RecordReader<ImmutableBytesWritable, Result> {
* @see org.apache.hadoop.mapreduce.RecordReader#close()
*/
@Override
- public void close() throws IOException {
+ public void close() {
this.recordReaderImpl.close();
- if (this.connection != null)
- this.connection.close();
}
/**
@@ -162,7 +158,4 @@ extends RecordReader<ImmutableBytesWritable, Result> {
return this.recordReaderImpl.getProgress();
}
- public void setConnection(Connection connection) {
- this.connection = connection;
- }
}