You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2017/11/09 08:31:29 UTC
phoenix git commit: PHOENIX-4303 Remove HTable and Use Table
APIs(Rajeshbabu)
Repository: phoenix
Updated Branches:
refs/heads/5.x-HBase-2.0 62027bff1 -> d85e9165a
PHOENIX-4303 Remove HTable and Use Table APIs(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d85e9165
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d85e9165
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d85e9165
Branch: refs/heads/5.x-HBase-2.0
Commit: d85e9165a7113449efb30cc9ab645e51da89629d
Parents: 62027bf
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Thu Nov 9 14:01:08 2017 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Thu Nov 9 14:01:08 2017 +0530
----------------------------------------------------------------------
.../wal/WALRecoveryRegionPostOpenIT.java | 13 ++++++----
...ReplayWithIndexWritesAndCompressedWALIT.java | 9 ++++---
.../phoenix/end2end/AggregateQueryIT.java | 20 ++++++++++-----
.../phoenix/end2end/FlappingLocalIndexIT.java | 7 +++---
.../end2end/NamespaceSchemaMappingIT.java | 8 +++---
.../apache/phoenix/end2end/RowTimestampIT.java | 14 +++++++----
.../apache/phoenix/end2end/StoreNullsIT.java | 9 ++++---
.../org/apache/phoenix/end2end/UseSchemaIT.java | 6 +++--
.../phoenix/end2end/index/DropColumnIT.java | 18 +++++++-------
.../index/IndexWithTableSchemaChangeIT.java | 6 ++---
.../phoenix/end2end/index/LocalIndexIT.java | 14 ++++++-----
.../index/MutableIndexReplicationIT.java | 8 +++---
...erRegionServerIndexRpcControllerFactory.java | 3 +--
...egionServerMetadataRpcControllerFactory.java | 3 +--
.../IndexHalfStoreFileReaderGenerator.java | 8 +++---
.../UngroupedAggregateRegionObserver.java | 1 -
.../apache/phoenix/execute/DelegateHTable.java | 26 ++++++++++++++++++++
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 25 ++++++++++++-------
.../mapreduce/MultiHfileOutputFormat.java | 20 +++++++++------
.../phoenix/mapreduce/index/IndexTool.java | 25 ++++++++++++-------
.../transaction/PhoenixTransactionalTable.java | 10 ++++----
.../phoenix/hbase/index/IndexTestingUtils.java | 10 ++++----
.../index/write/TestWALRecoveryCaching.java | 11 +++++----
23 files changed, 174 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
index d74ddb2..20d59a7 100644
--- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
+++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
@@ -44,12 +44,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
@@ -192,7 +194,8 @@ public class WALRecoveryRegionPostOpenIT extends BaseTest {
this.assertRegionServerDifferent(miniHBaseCluster);
Scan scan = new Scan();
- HTable primaryTable = new HTable(getUtility().getConfiguration(), DATA_TABLE_NAME);
+ org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(getUtility().getConfiguration());
+ Table primaryTable = hbaseConn.getTable(TableName.valueOf(DATA_TABLE_NAME));
ResultScanner resultScanner = primaryTable.getScanner(scan);
int count = 0;
for (Result result : resultScanner) {
@@ -244,7 +247,7 @@ public class WALRecoveryRegionPostOpenIT extends BaseTest {
// the index table is one row
- HTable indexTable = new HTable(getUtility().getConfiguration(), INDEX_TABLE_NAME);
+ Table indexTable = hbaseConn.getTable(TableName.valueOf(INDEX_TABLE_NAME));
resultScanner = indexTable.getScanner(scan);
count = 0;
for (Result result : resultScanner) {
@@ -256,8 +259,8 @@ public class WALRecoveryRegionPostOpenIT extends BaseTest {
scan = new Scan();
primaryTable.close();
- primaryTable = new HTable(getUtility().getConfiguration(), DATA_TABLE_NAME);
- primaryTable.getConnection().clearRegionCache();
+ primaryTable = hbaseConn.getTable(TableName.valueOf(DATA_TABLE_NAME));
+ ((ClusterConnection)hbaseConn).clearRegionCache();
resultScanner = primaryTable.getScanner(scan);
count = 0;
for (Result result : resultScanner) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
index dfff8fe..b504acd 100644
--- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
+++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
@@ -38,13 +38,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
@@ -223,9 +224,11 @@ public class WALReplayWithIndexWritesAndCompressedWALIT {
// initialize the region - this should replay the WALEdits from the WAL
region1.initialize();
+ org.apache.hadoop.hbase.client.Connection hbaseConn =
+ ConnectionFactory.createConnection(UTIL.getConfiguration());
// now check to ensure that we wrote to the index table
- HTable index = new HTable(UTIL.getConfiguration(), INDEX_TABLE_NAME);
+ Table index = hbaseConn.getTable(org.apache.hadoop.hbase.TableName.valueOf(INDEX_TABLE_NAME));
int indexSize = getKeyValueCount(index);
assertEquals("Index wasn't propertly updated from WAL replay!", 1, indexSize);
Get g = new Get(rowkey);
@@ -290,7 +293,7 @@ public class WALReplayWithIndexWritesAndCompressedWALIT {
}
@SuppressWarnings("deprecation")
-private int getKeyValueCount(HTable table) throws IOException {
+private int getKeyValueCount(Table table) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(Integer.MAX_VALUE - 1);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java
index 6c85774..437ee4f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java
@@ -35,8 +35,13 @@ import java.sql.ResultSet;
import java.util.Collection;
import java.util.Properties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.util.ByteUtil;
@@ -100,17 +105,20 @@ public class AggregateQueryIT extends BaseQueryIT {
byte[] tableNameBytes = Bytes.toBytes(tableName);
admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
- HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes);
- htable.clearRegionCache();
- int nRegions = htable.getRegionLocations().size();
+ Table htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes);
+ Configuration configuration = conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
+ org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(configuration);
+ ((ClusterConnection)hbaseConn).clearRegionCache(TableName.valueOf(tableName));
+ RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+ int nRegions = regionLocator.getAllRegionLocations().size();
admin.split(tableNameBytes, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A3")));
int retryCount = 0;
do {
Thread.sleep(2000);
retryCount++;
//htable.clearRegionCache();
- } while (retryCount < 10 && htable.getRegionLocations().size() == nRegions);
- assertNotEquals(nRegions, htable.getRegionLocations().size());
+ } while (retryCount < 10 && regionLocator.getAllRegionLocations().size() == nRegions);
+ assertNotEquals(nRegions, regionLocator.getAllRegionLocations().size());
statement.setString(1, tenantId);
rs = statement.executeQuery();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
index e2f3970..0d64be0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -286,8 +286,9 @@ public class FlappingLocalIndexIT extends BaseLocalIndexIT {
assertTrue(rs.next());
assertEquals(4, rs.getInt(1));
HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
- HTable indexTable = new HTable(admin.getConfiguration(),Bytes.toBytes(indexPhysicalTableName));
- Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys();
+ org.apache.hadoop.hbase.client.Connection hbaseConn = admin.getConnection();
+ Table indexTable = hbaseConn.getTable(TableName.valueOf(indexPhysicalTableName));
+ Pair<byte[][], byte[][]> startEndKeys = hbaseConn.getRegionLocator(TableName.valueOf(indexPhysicalTableName)).getStartEndKeys();
byte[][] startKeys = startEndKeys.getFirst();
byte[][] endKeys = startEndKeys.getSecond();
for (int i = 0; i < startKeys.length; i++) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceSchemaMappingIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceSchemaMappingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceSchemaMappingIT.java
index 0dfd550..d9a27f5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceSchemaMappingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceSchemaMappingIT.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
@@ -71,14 +71,14 @@ public class NamespaceSchemaMappingIT extends ParallelStatsDisabledIT {
Put put = new Put(PVarchar.INSTANCE.toBytes(phoenixFullTableName));
put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
- HTable phoenixSchematable = new HTable(admin.getConfiguration(), phoenixFullTableName);
+ Table phoenixSchematable = admin.getConnection().getTable(TableName.valueOf(phoenixFullTableName));
phoenixSchematable.put(put);
phoenixSchematable.close();
put = new Put(PVarchar.INSTANCE.toBytes(hbaseFullTableName));
put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
phoenixSchematable.close();
- HTable namespaceMappedtable = new HTable(admin.getConfiguration(), hbaseFullTableName);
+ Table namespaceMappedtable = admin.getConnection().getTable(TableName.valueOf(hbaseFullTableName));
namespaceMappedtable.put(put);
namespaceMappedtable.close();
Properties props = new Properties();
@@ -92,7 +92,7 @@ public class NamespaceSchemaMappingIT extends ParallelStatsDisabledIT {
assertTrue(rs.next());
assertEquals(phoenixFullTableName, rs.getString(1));
- HTable metatable = new HTable(admin.getConfiguration(),
+ Table metatable = admin.getConnection().getTable(
SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
(conn.unwrap(PhoenixConnection.class).getQueryServices().getProps())));
Put p = new Put(SchemaUtil.getTableKey(null, schemaName, tableName));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowTimestampIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowTimestampIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowTimestampIT.java
index 458cc38..930092d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowTimestampIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowTimestampIT.java
@@ -32,10 +32,12 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -143,14 +145,15 @@ public class RowTimestampIT extends ParallelStatsDisabledIT {
// verify that the timestamp of the keyvalues matches the ROW_TIMESTAMP column value
Scan scan = new Scan();
byte[] emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(true).getFirst();
- HTable hTable = new HTable(getUtility().getConfiguration(), tableName);
+ org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(getUtility().getConfiguration());
+ Table hTable = hbaseConn.getTable(TableName.valueOf(tableName));
ResultScanner resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
long timeStamp = result.getColumnLatest(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp();
assertEquals(rowTimestampDate.getTime(), timeStamp);
}
if (!mutable) {
- hTable = new HTable(getUtility().getConfiguration(), indexName);
+ hTable = hbaseConn.getTable(TableName.valueOf(indexName));
resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
long timeStamp = result.getColumnLatest(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp();
@@ -253,14 +256,15 @@ public class RowTimestampIT extends ParallelStatsDisabledIT {
// verify that the timestamp of the keyvalues matches the ROW_TIMESTAMP column value
Scan scan = new Scan();
byte[] emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(true).getFirst();
- HTable hTable = new HTable(getUtility().getConfiguration(), tableName);
+ org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(getUtility().getConfiguration());
+ Table hTable = hbaseConn.getTable(TableName.valueOf(tableName));
ResultScanner resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
long timeStamp = result.getColumnLatest(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp();
assertEquals(rowTimestampDate.getTime(), timeStamp);
}
if (!mutable) {
- hTable = new HTable(getUtility().getConfiguration(), indexName);
+ hTable = hbaseConn.getTable(TableName.valueOf(indexName));
resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
long timeStamp = result.getColumnLatest(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
index 378a9ed..63f127c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
@@ -33,10 +33,12 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.SingleCellColumnExpression;
@@ -133,8 +135,9 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
rs1.next();
assertNull(rs1.getString(1));
rs1.next();
-
- HTable htable = new HTable(getUtility().getConfiguration(), dataTableName);
+ Table htable =
+ ConnectionFactory.createConnection(getUtility().getConfiguration()).getTable(
+ TableName.valueOf(dataTableName));
Scan s = new Scan();
s.setRaw(true);
ResultScanner scanner = htable.getScanner(s);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/UseSchemaIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UseSchemaIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UseSchemaIT.java
index 07ae77e..a578bd3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UseSchemaIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UseSchemaIT.java
@@ -30,9 +30,11 @@ import java.util.Properties;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
@@ -172,7 +174,7 @@ public class UseSchemaIT extends ParallelStatsDisabledIT {
Put put = new Put(PVarchar.INSTANCE.toBytes(fullTablename));
put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
- HTable phoenixSchematable = new HTable(admin.getConfiguration(), fullTablename);
+ Table phoenixSchematable = admin.getConnection().getTable(TableName.valueOf(fullTablename));
phoenixSchematable.put(put);
phoenixSchematable.close();
conn.createStatement().execute("CREATE VIEW " + tableName + " (tablename VARCHAR PRIMARY KEY)");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
index badb2a6..766e924 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
@@ -35,10 +35,10 @@ import java.util.Collection;
import java.util.Properties;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -198,7 +198,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT {
scan.setRaw(true);
scan.setStartRow(key);
scan.setStopRow(key);
- HTable table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes());
+ Table table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes());
ResultScanner results = table.getScanner(scan);
Result result = results.next();
assertNotNull(result);
@@ -209,7 +209,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT {
// key value for v2 should have been deleted from the global index table
scan = new Scan();
scan.setRaw(true);
- table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(indexTableName.getBytes());
+ table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(indexTableName.getBytes());
results = table.getScanner(scan);
result = results.next();
assertNotNull(result);
@@ -220,7 +220,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT {
scan = new Scan();
scan.setRaw(true);
scan.addFamily(QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
- table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes());
+ table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes());
results = table.getScanner(scan);
result = results.next();
assertNotNull(result);
@@ -248,7 +248,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT {
byte[] key = Bytes.toBytes("a");
scan.setStartRow(key);
scan.setStopRow(key);
- HTable table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes());
+ Table table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes());
ResultScanner results = table.getScanner(scan);
Result result = results.next();
assertNotNull(result);
@@ -268,7 +268,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT {
// key value for v2 should exist in the global index table
scan = new Scan();
scan.setRaw(true);
- table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(indexTableName.getBytes());
+ table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(indexTableName.getBytes());
results = table.getScanner(scan);
result = results.next();
assertNotNull(result);
@@ -288,7 +288,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT {
scan = new Scan();
scan.setRaw(true);
scan.addFamily(QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
- table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes());
+ table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes());
results = table.getScanner(scan);
result = results.next();
assertNotNull(result);
@@ -379,7 +379,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT {
// there should be a single row belonging to localIndexTableName2
Scan scan = new Scan();
scan.addFamily(QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
- HTable table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(localIndexTablePhysicalName.getBytes());
+ Table table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(localIndexTablePhysicalName.getBytes());
ResultScanner results = table.getScanner(scan);
Result result = results.next();
assertNotNull(result);
@@ -502,7 +502,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT {
// scan the physical table and verify there is a single row for the second local index
Scan scan = new Scan();
- HTable table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(viewIndexPhysicalTable);
+ Table table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(viewIndexPhysicalTable);
ResultScanner results = table.getScanner(scan);
Result result = results.next();
assertNotNull(result);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexWithTableSchemaChangeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexWithTableSchemaChangeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexWithTableSchemaChangeIT.java
index aad7f73..7fc1a3a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexWithTableSchemaChangeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexWithTableSchemaChangeIT.java
@@ -31,10 +31,10 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -526,7 +526,7 @@ public class IndexWithTableSchemaChangeIT extends ParallelStatsDisabledIT {
// verify data table rows
Scan scan = new Scan();
- HTable table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName));
+ Table table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName));
ResultScanner results = table.getScanner(scan);
for (Result res : results) {
assertNull("Column value was not deleted",res.getValue(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V2")));
@@ -541,7 +541,7 @@ public class IndexWithTableSchemaChangeIT extends ParallelStatsDisabledIT {
// verify index table rows
scan = new Scan();
- table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexTableFullName));
+ table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexTableFullName));
results = table.getScanner(scan);
for (Result res : results) {
assertNull("Column value was not deleted",res.getValue(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("0:V2")));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 6ea96a9..615d2aa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -44,8 +44,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -174,11 +175,12 @@ public class LocalIndexIT extends BaseLocalIndexIT {
HTableDescriptor htd = admin
.getTableDescriptor(Bytes.toBytes(indexPhysicalTableName));
assertEquals(IndexRegionSplitPolicy.class.getName(), htd.getValue(HTableDescriptor.SPLIT_POLICY));
- try (HTable userTable = new HTable(admin.getConfiguration(),
- SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped))) {
- try (HTable indexTable = new HTable(admin.getConfiguration(), Bytes.toBytes(indexPhysicalTableName))) {
- assertArrayEquals("Both user table and index table should have same split keys.",
- userTable.getStartKeys(), indexTable.getStartKeys());
+ try(org.apache.hadoop.hbase.client.Connection c = ConnectionFactory.createConnection(admin.getConfiguration())) {
+ try (RegionLocator userTable= c.getRegionLocator(SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped))) {
+ try (RegionLocator indxTable = c.getRegionLocator(TableName.valueOf(indexPhysicalTableName))) {
+ assertArrayEquals("Both user table and index table should have same split keys.",
+ userTable.getStartKeys(), indxTable.getStartKeys());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
index 48265ed..9c6923c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
@@ -42,11 +42,12 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
@@ -245,7 +246,8 @@ public class MutableIndexReplicationIT extends BaseTest {
// lookup tables. For right now, we just go through an HTable
LOG.info("Looking up tables in replication target");
TableName[] tables = admin2.listTableNames();
- HTable remoteTable = new HTable(utility2.getConfiguration(), tables[0]);
+ org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(utility2.getConfiguration());
+ Table remoteTable = hbaseConn.getTable(tables[0]);
for (int i = 0; i < REPLICATION_RETRIES; i++) {
if (i >= REPLICATION_RETRIES - 1) {
fail("Waited too much time for put replication on table " + remoteTable
@@ -261,7 +263,7 @@ public class MutableIndexReplicationIT extends BaseTest {
remoteTable.close();
}
- private boolean ensureAnyRows(HTable remoteTable) throws IOException {
+ private boolean ensureAnyRows(Table remoteTable) throws IOException {
Scan scan = new Scan();
scan.setRaw(true);
ResultScanner scanner = remoteTable.getScanner(scan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
index 5a7f75f..47b6c40 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
@@ -22,12 +22,11 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
/**
- * {@link RpcControllerFactory} that should only be used when creating {@link HTable} for
+ * {@link RpcControllerFactory} that should only be used when creating {@link Table} for
* making remote RPCs to the region servers hosting global mutable index table regions.
* This controller factory shouldn't be globally configured anywhere and is meant to be used
* only internally by Phoenix indexing code.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
index 37f3927..3f63ac3 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
@@ -22,12 +22,11 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
/**
- * {@link RpcControllerFactory} that should only be used when creating {@link HTable} for
+ * {@link RpcControllerFactory} that should only be used when creating {@link Table} for
* making remote RPCs to the region servers hosting Phoenix SYSTEM tables.
*/
public class InterRegionServerMetadataRpcControllerFactory extends RpcControllerFactory {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
index 88154a7..992e65f 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
@@ -36,10 +36,11 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -98,10 +99,11 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
}
if(scvf != null) scan.setFilter(scvf);
byte[] regionStartKeyInHFile = null;
- HTable metaTable = null;
+ Connection connection = ctx.getEnvironment().getConnection();
+ Table metaTable = null;
PhoenixConnection conn = null;
try {
- metaTable = new HTable(ctx.getEnvironment().getConfiguration(), TableName.META_TABLE_NAME);
+ metaTable = connection.getTable(TableName.META_TABLE_NAME));
ResultScanner scanner = null;
Result result = null;
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index e68f95e..ab6309c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
index 444bb5d..15d5cf6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
@@ -294,4 +295,29 @@ public class DelegateHTable implements Table {
public void setWriteRpcTimeout(int writeRpcTimeout) {
delegate.setWriteRpcTimeout(writeRpcTimeout);
}
+
+ @Override
+ public boolean[] exists(List<Get> gets) throws IOException {
+ return delegate.existsAll(gets);
+ }
+
+ @Override
+ public long getRpcTimeout(TimeUnit unit) {
+ return delegate.getRpcTimeout();
+ }
+
+ @Override
+ public long getReadRpcTimeout(TimeUnit unit) {
+ return delegate.getReadRpcTimeout(unit);
+ }
+
+ @Override
+ public long getWriteRpcTimeout(TimeUnit unit) {
+ return delegate.getWriteRpcTimeout(unit);
+ }
+
+ @Override
+ public long getOperationTimeout(TimeUnit unit) {
+ return delegate.getOperationTimeout(unit);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index f717647..22e5c3c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -41,7 +41,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
@@ -289,13 +293,15 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
job.setOutputValueClass(KeyValue.class);
job.setReducerClass(FormatToKeyValueReducer.class);
byte[][] splitKeysBeforeJob = null;
- HTable table = null;
+ org.apache.hadoop.hbase.client.Connection hbaseConn =
+ ConnectionFactory.createConnection(job.getConfiguration());
+ RegionLocator regionLocator = null;
if(hasLocalIndexes) {
try{
- table = new HTable(job.getConfiguration(), qualifiedTableName);
- splitKeysBeforeJob = table.getRegionLocator().getStartKeys();
+ regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(qualifiedTableName));
+ splitKeysBeforeJob = regionLocator.getStartKeys();
} finally {
- if(table != null )table.close();
+ if(regionLocator != null )regionLocator.close();
}
}
MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
@@ -315,8 +321,8 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
if (success) {
if (hasLocalIndexes) {
try {
- table = new HTable(job.getConfiguration(), qualifiedTableName);
- if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, table.getRegionLocator().getStartKeys())) {
+ regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(qualifiedTableName));
+ if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, regionLocator.getStartKeys())) {
LOG.error("The table "
+ qualifiedTableName
+ " has local indexes and there is split key mismatch before and"
@@ -325,7 +331,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
return -1;
}
} finally {
- if (table != null) table.close();
+ if (regionLocator != null) regionLocator.close();
}
}
LOG.info("Loading HFiles from {}", outputPath);
@@ -350,9 +356,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String tableName = table.getPhysicalName();
Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, tableName);
- HTable htable = new HTable(conf,tableName);
+ org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(conf);
+ Table htable = hbaseConn.getTable(TableName.valueOf(tableName));
LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
- loader.doBulkLoad(tableOutputPath, htable);
+ loader.doBulkLoad(tableOutputPath, hbaseConn.getAdmin(), htable, hbaseConn.getRegionLocator(TableName.valueOf(tableName)));
LOG.info("Incremental load complete for table=" + tableName);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
index 30f21ce..c888b7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
@@ -40,7 +40,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -650,6 +652,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
* @param tablesToBeLoaded
* @throws IOException
*/
+ @SuppressWarnings("deprecation")
public static void configureIncrementalLoad(Job job, List<TargetTableRef> tablesToBeLoaded) throws IOException {
Configuration conf = job.getConfiguration();
@@ -662,13 +665,16 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
Set<TableRowkeyPair> tablesStartKeys = Sets.newTreeSet();
for(TargetTableRef table : tablesToBeLoaded) {
final String tableName = table.getPhysicalName();
- try(HTable htable = new HTable(conf,tableName);){
- Set<TableRowkeyPair> startKeys = getRegionStartKeys(tableName , htable.getRegionLocator());
+ try(Connection hbaseConn = ConnectionFactory.createConnection(conf);){
+ Set<TableRowkeyPair> startKeys =
+ getRegionStartKeys(tableName,
+ hbaseConn.getRegionLocator(TableName.valueOf(tableName)));
tablesStartKeys.addAll(startKeys);
- String compressionConfig = configureCompression(htable.getTableDescriptor());
- String bloomTypeConfig = configureBloomType(htable.getTableDescriptor());
- String blockSizeConfig = configureBlockSize(htable.getTableDescriptor());
- String blockEncodingConfig = configureDataBlockEncoding(htable.getTableDescriptor());
+ HTableDescriptor tableDescriptor = hbaseConn.getTable(TableName.valueOf(tableName)).getTableDescriptor();
+ String compressionConfig = configureCompression(tableDescriptor);
+ String bloomTypeConfig = configureBloomType(tableDescriptor);
+ String blockSizeConfig = configureBlockSize(tableDescriptor);
+ String blockEncodingConfig = configureDataBlockEncoding(tableDescriptor);
Map<String,String> tableConfigs = Maps.newHashMap();
if(StringUtils.isNotBlank(compressionConfig)) {
tableConfigs.put(COMPRESSION_FAMILIES_CONF_KEY, compressionConfig);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index af080b4..cf13075 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -48,11 +48,10 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
@@ -479,7 +478,8 @@ public class IndexTool extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Connection connection = null;
- HTable htable = null;
+ Table htable = null;
+ RegionLocator regionLocator = null;
try {
CommandLine cmdLine = null;
try {
@@ -508,11 +508,14 @@ public class IndexTool extends Configured implements Tool {
}
pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
- htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices()
+ htable = connection.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(pindexTable.getPhysicalName().getBytes());
+ regionLocator =
+ ConnectionFactory.createConnection(configuration).getRegionLocator(
+ TableName.valueOf(pindexTable.getPhysicalName().getBytes()));
if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
isLocalIndexBuild = true;
- splitKeysBeforeJob = htable.getRegionLocator().getStartKeys();
+ splitKeysBeforeJob = regionLocator.getStartKeys();
}
}
@@ -539,11 +542,12 @@ public class IndexTool extends Configured implements Tool {
if (result) {
if (!useDirectApi && indexTable != null) {
if (isLocalIndexBuild) {
- validateSplitForLocalIndex(splitKeysBeforeJob, htable);
+ validateSplitForLocalIndex(splitKeysBeforeJob, regionLocator);
}
LOG.info("Loading HFiles from {}", outputPath);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
- loader.doBulkLoad(outputPath, htable);
+ loader.doBulkLoad(outputPath, connection.unwrap(PhoenixConnection.class)
+ .getQueryServices().getAdmin(), htable, regionLocator);
htable.close();
// Without direct API, we need to update the index state to ACTIVE from client.
IndexToolUtil.updateIndexState(connection, qDataTable, indexTable, PIndexState.ACTIVE);
@@ -566,6 +570,9 @@ public class IndexTool extends Configured implements Tool {
if (htable != null) {
htable.close();
}
+ if(regionLocator != null) {
+ regionLocator.close();
+ }
} catch (SQLException sqle) {
LOG.error("Failed to close connection ", sqle.getMessage());
throw new RuntimeException("Failed to close connection");
@@ -575,9 +582,9 @@ public class IndexTool extends Configured implements Tool {
- private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, HTable htable) throws Exception {
+ private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, RegionLocator regionLocator) throws Exception {
if (splitKeysBeforeJob != null
- && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, htable.getRegionLocator().getStartKeys())) {
+ && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, regionLocator.getStartKeys())) {
String errMsg = "The index to build is local index and the split keys are not matching"
+ " before and after running the job. Please rerun the job otherwise"
+ " there may be inconsistencies between actual data and index data";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
index 1293a21..ef3a8fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
@@ -117,27 +117,27 @@ public interface PhoenixTransactionalTable extends Table {
public void delete(List<Delete> deletes) throws IOException;
/**
- * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)}
+ * Delegates to {@link Table#setAutoFlush(boolean autoFlush)}
*/
public void setAutoFlush(boolean autoFlush);
/**
- * Delegates to {@link HTable#isAutoFlush()}
+ * Delegates to {@link Table#isAutoFlush()}
*/
public boolean isAutoFlush();
/**
- * Delegates to see HTable.getWriteBufferSize()
+ * Delegates to see Table.getWriteBufferSize()
*/
public long getWriteBufferSize();
/**
- * Delegates to see HTable.setWriteBufferSize()
+ * Delegates to see Table.setWriteBufferSize()
*/
public void setWriteBufferSize(long writeBufferSize) throws IOException;
/**
- * Delegates to see HTable.flushCommits()
+ * Delegates to see Table.flushCommits()
*/
public void flushCommits() throws IOException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java
index 7fa9c8e..5868103 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java
@@ -28,10 +28,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.Bytes;
@@ -63,9 +63,9 @@ public class IndexTestingUtils {
* @throws IOException
*/
@SuppressWarnings("javadoc")
- public static void verifyIndexTableAtTimestamp(HTable index1, List<KeyValue> expected,
+ public static void verifyIndexTableAtTimestamp(Table index1, List<KeyValue> expected,
long start, long end, byte[] startKey, byte[] endKey) throws IOException {
- LOG.debug("Scanning " + Bytes.toString(index1.getTableName()) + " between times (" + start
+ LOG.debug("Scanning " + index1.getName().getNameAsString() + " between times (" + start
+ ", " + end + "] and keys: [" + Bytes.toString(startKey) + ", " + Bytes.toString(endKey)
+ "].");
Scan s = new Scan(startKey, endKey);
@@ -82,12 +82,12 @@ public class IndexTestingUtils {
assertEquals("Didn't get the expected kvs from the index table!", expected, received);
}
- public static void verifyIndexTableAtTimestamp(HTable index1, List<KeyValue> expected, long ts,
+ public static void verifyIndexTableAtTimestamp(Table index1, List<KeyValue> expected, long ts,
byte[] startKey) throws IOException {
IndexTestingUtils.verifyIndexTableAtTimestamp(index1, expected, ts, startKey, HConstants.EMPTY_END_ROW);
}
- public static void verifyIndexTableAtTimestamp(HTable index1, List<KeyValue> expected, long start,
+ public static void verifyIndexTableAtTimestamp(Table index1, List<KeyValue> expected, long start,
byte[] startKey, byte[] endKey) throws IOException {
verifyIndexTableAtTimestamp(index1, expected, start, start + 1, startKey, endKey);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
index faee74a..62cb24e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
@@ -39,13 +39,15 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -192,9 +194,9 @@ public class TestWALRecoveryCaching {
// load some data into the table
Put p = new Put(Bytes.toBytes("row"));
p.addColumn(family, qual, Bytes.toBytes("value"));
- HTable primary = new HTable(conf, testTable.getTableName());
+ Connection hbaseConn = ConnectionFactory.createConnection(conf);
+ Table primary = hbaseConn.getTable(org.apache.hadoop.hbase.TableName.valueOf(testTable.getTableName()));
primary.put(p);
- primary.flushCommits();
// turn on the recovery latch
allowIndexTableToRecover = new CountDownLatch(1);
@@ -236,7 +238,6 @@ public class TestWALRecoveryCaching {
Put p2 = new Put(p.getRow());
p2.addColumn(nonIndexedFamily, Bytes.toBytes("Not indexed"), Bytes.toBytes("non-indexed value"));
primary.put(p2);
- primary.flushCommits();
// make sure that we actually failed the write once (within a 5 minute window)
assertTrue("Didn't find an error writing to index table within timeout!",
@@ -245,7 +246,7 @@ public class TestWALRecoveryCaching {
// scan the index to make sure it has the one entry, (that had to be replayed from the WAL,
// since we hard killed the server)
Scan s = new Scan();
- HTable index = new HTable(conf, getIndexTableName());
+ Table index = hbaseConn.getTable(org.apache.hadoop.hbase.TableName.valueOf(getIndexTableName()));
ResultScanner scanner = index.getScanner(s);
int count = 0;
for (Result r : scanner) {