You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2017/11/08 11:35:50 UTC
phoenix git commit: PHOENIX-4350 Replace deprecated or changed Region
methods with new APIs(Rajeshbabu)
Repository: phoenix
Updated Branches:
refs/heads/5.x-HBase-2.0 113904275 -> 62027bff1
PHOENIX-4350 Replace deprecated or changed Region methods with new APIs(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/62027bff
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/62027bff
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/62027bff
Branch: refs/heads/5.x-HBase-2.0
Commit: 62027bff132dda23e6f7ae30334f191e68072ba2
Parents: 1139042
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Wed Nov 8 17:05:31 2017 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Wed Nov 8 17:05:31 2017 +0530
----------------------------------------------------------------------
...ReplayWithIndexWritesAndCompressedWALIT.java | 8 ++-
.../DataTableLocalIndexRegionScanner.java | 3 +-
.../coprocessor/MetaDataEndpointImpl.java | 57 ++++++++------------
.../coprocessor/SequenceRegionObserver.java | 21 +++-----
.../UngroupedAggregateRegionObserver.java | 30 ++++++-----
.../hbase/index/IndexRegionSplitPolicy.java | 25 ++++-----
.../org/apache/phoenix/hbase/index/Indexer.java | 3 +-
.../stats/DefaultStatisticsCollector.java | 2 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 4 +-
.../org/apache/phoenix/util/ServerUtil.java | 20 +++++++
pom.xml | 2 +-
11 files changed, 85 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
index 5ca6de9..dfff8fe 100644
--- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
+++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
@@ -183,15 +183,14 @@ public class WALReplayWithIndexWritesAndCompressedWALIT {
CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
builder.addIndexGroup(fam1);
builder.build(htd);
+ WALFactory walFactory = new WALFactory(this.conf, null, "localhost,1234");
+ WAL wal = createWAL(this.conf, walFactory);
// create the region + its WAL
- HRegion region0 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd); // FIXME: Uses private type
+ HRegion region0 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd, wal); // FIXME: Uses private type
region0.close();
region0.getWAL().close();
- WALFactory walFactory = new WALFactory(this.conf, null, "localhost,1234");
-
- WAL wal = createWAL(this.conf, walFactory);
HRegionServer mockRS = Mockito.mock(HRegionServer.class);
// mock out some of the internals of the RSS, so we can run CPs
when(mockRS.getWAL(null)).thenReturn(wal);
@@ -202,7 +201,6 @@ public class WALReplayWithIndexWritesAndCompressedWALIT {
when(mockRS.getServerName()).thenReturn(mockServerName);
HRegion region = spy(new HRegion(basedir, wal, this.fs, this.conf, hri, htd, mockRS));
region.initialize();
- when(region.getSequenceId()).thenReturn(0l);
//make an attempted write to the primary that should also be indexed
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
index 64d4ac4..eee6c93 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
@@ -94,8 +94,7 @@ public class DataTableLocalIndexRegionScanner extends DelegateRegionScanner {
boolean next = super.next(dataTableResults);
addMutations(dataTableResults);
if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)||!next) {
- region.batchMutate(mutationList.toArray(new Mutation[mutationList.size()]), HConstants.NO_NONCE,
- HConstants.NO_NONCE);
+ region.batchMutate(mutationList.toArray(new Mutation[mutationList.size()]));
mutationList.clear();
}
return next;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index a42e1b7..c2124d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -1405,7 +1405,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// Place a lock using key for the table to be created
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
try {
- acquireLock(region, tableKey, locks);
+ ServerUtil.acquireLock(region, tableKey, locks);
// If the table key resides outside the region, return without doing anything
MetaDataMutationResult result = checkTableKeyInRegion(tableKey, region);
@@ -1423,7 +1423,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// For an index on view, the view header row needs to be locked.
result = checkTableKeyInRegion(parentTableKey, region);
if (result == null) {
- acquireLock(region, parentTableKey, locks);
+ ServerUtil.acquireLock(region, parentTableKey, locks);
parentCacheKey = new ImmutableBytesPtr(parentTableKey);
parentTable = loadTable(env, parentTableKey, parentCacheKey, clientTimeStamp,
clientTimeStamp, clientVersion);
@@ -1632,7 +1632,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(builder.build());
return;
} finally {
- region.releaseRowLocks(locks);
+ ServerUtil.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("createTable failed", t);
@@ -1648,16 +1648,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
QueryServicesOptions.DEFAULT_MAX_INDEXES_PER_TABLE);
}
- private static RowLock acquireLock(Region region, byte[] key, List<RowLock> locks)
- throws IOException {
- RowLock rowLock = region.getRowLock(key, false);
- if (rowLock == null) {
- throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
- }
- locks.add(rowLock);
- return rowLock;
- }
-
private static final byte[] CHILD_TABLE_BYTES = new byte[] {PTable.LinkType.CHILD_TABLE.getSerializedValue()};
@@ -1883,7 +1873,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(MetaDataMutationResult.toProto(result));
return;
} finally {
- region.releaseRowLocks(locks);
+ ServerUtil.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("dropTable failed", t);
@@ -1962,7 +1952,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] viewKey = SchemaUtil.getTableKey(viewTenantId, viewSchemaName, viewName);
Delete delete = new Delete(viewKey, clientTimeStamp);
rowsToDelete.add(delete);
- acquireLock(region, viewKey, locks);
+ ServerUtil.acquireLock(region, viewKey, locks);
MetaDataMutationResult result = doDropTable(viewKey, viewTenantId, viewSchemaName,
viewName, null, PTableType.VIEW, rowsToDelete, invalidateList, locks,
tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
@@ -2025,7 +2015,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// of the client.
Delete delete = new Delete(indexKey, clientTimeStamp);
rowsToDelete.add(delete);
- acquireLock(region, indexKey, locks);
+ ServerUtil.acquireLock(region, indexKey, locks);
MetaDataMutationResult result =
doDropTable(indexKey, tenantId, schemaName, indexName, tableName, PTableType.INDEX,
rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
@@ -2061,7 +2051,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
List<RowLock> locks = Lists.newArrayList();
try {
- acquireLock(region, key, locks);
+ ServerUtil.acquireLock(region, key, locks);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
invalidateList.add(cacheKey);
@@ -2155,7 +2145,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table);
}
} finally {
- region.releaseRowLocks(locks);
+ ServerUtil.releaseRowLocks(locks);
}
} catch (Throwable t) {
ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
@@ -2356,7 +2346,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] viewKey = SchemaUtil.getTableKey(tenantId, schema, table);
// lock the rows corresponding to views so that no other thread can modify the view meta-data
- RowLock viewRowLock = acquireLock(region, viewKey, locks);
+ RowLock viewRowLock = ServerUtil.acquireLock(region, viewKey, locks);
PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion);
ColumnOrdinalPositionUpdateList ordinalPositionList = new ColumnOrdinalPositionUpdateList();
@@ -2681,7 +2671,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// lock the rows corresponding to views so that no other thread can modify the view
// meta-data
- RowLock viewRowLock = acquireLock(region, viewKey, locks);
+ RowLock viewRowLock = ServerUtil.acquireLock(region, viewKey, locks);
PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion);
ColumnOrdinalPositionUpdateList ordinalPositionList =
@@ -3223,10 +3213,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if(functionsAvailable.size() == numFunctions) return functionsAvailable;
return null;
} finally {
- for (Region.RowLock lock : rowLocks) {
- lock.release();
- }
- rowLocks.clear();
+ ServerUtil.releaseRowLocks(rowLocks);
}
}
@@ -3380,7 +3367,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// Since we're dropping the index, lock it to ensure
// that a change in index state doesn't
// occur while we're dropping it.
- acquireLock(region, indexKey, locks);
+ ServerUtil.acquireLock(region, indexKey, locks);
// Drop the index table. The doDropTable will expand
// this to all of the table rows and invalidate the
// index table
@@ -3772,7 +3759,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
long clientTimeStamp = request.getClientTimestamp();
List<RowLock> locks = Lists.newArrayList();
try {
- acquireLock(region, lockKey, locks);
+ ServerUtil.acquireLock(region, lockKey, locks);
// Get as of latest timestamp so we can detect if we have a
// newer schema that already
// exists without making an additional query
@@ -3802,7 +3789,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(builder.build());
return;
} finally {
- region.releaseRowLocks(locks);
+ ServerUtil.releaseRowLocks(locks);
}
}
@@ -3876,7 +3863,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
try {
- acquireLock(region, lockKey, locks);
+ ServerUtil.acquireLock(region, lockKey, locks);
// Get as of latest timestamp so we can detect if we have a newer function that already
// exists without making an additional query
ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
@@ -3919,7 +3906,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(builder.build());
return;
} finally {
- region.releaseRowLocks(locks);
+ ServerUtil.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("createFunction failed", t);
@@ -3948,7 +3935,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
try {
- acquireLock(region, lockKey, locks);
+ ServerUtil.acquireLock(region, lockKey, locks);
List<byte[]> keys = new ArrayList<byte[]>(1);
keys.add(lockKey);
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
@@ -3971,7 +3958,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(MetaDataMutationResult.toProto(result));
return;
} finally {
- region.releaseRowLocks(locks);
+ ServerUtil.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("dropFunction failed", t);
@@ -4046,7 +4033,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMutations);
try {
- acquireLock(region, lockKey, locks);
+ ServerUtil.acquireLock(region, lockKey, locks);
// Get as of latest timestamp so we can detect if we have a newer schema that already exists without
// making an additional query
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey);
@@ -4086,7 +4073,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(builder.build());
return;
} finally {
- region.releaseRowLocks(locks);
+ ServerUtil.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("Creating the schema" + schemaName + "failed", t);
@@ -4110,7 +4097,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMetaData);
try {
- acquireLock(region, lockKey, locks);
+ ServerUtil.acquireLock(region, lockKey, locks);
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>(1);
result = doDropSchema(clientTimeStamp, schemaName, lockKey, schemaMetaData, invalidateList);
if (result.getMutationCode() != MutationCode.SCHEMA_ALREADY_EXISTS) {
@@ -4129,7 +4116,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(MetaDataMutationResult.toProto(result));
return;
} finally {
- region.releaseRowLocks(locks);
+ ServerUtil.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("drop schema failed:", t);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 8ef5e80..c004818 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -89,15 +89,6 @@ public class SequenceRegionObserver implements RegionObserver {
QueryConstants.EMPTY_COLUMN_BYTES, timestamp, errorCodeBuf)));
}
- private static void acquireLock(Region region, byte[] key, List<RowLock> locks)
- throws IOException {
- RowLock rowLock = region.getRowLock(key, false);
- if (rowLock == null) {
- throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
- }
- locks.add(rowLock);
- }
-
/**
* Use PreIncrement hook of BaseRegionObserver to overcome deficiencies in Increment
* implementation (HBASE-10254):
@@ -121,7 +112,7 @@ public class SequenceRegionObserver implements RegionObserver {
TimeRange tr = increment.getTimeRange();
region.startRegionOperation();
try {
- acquireLock(region, row, locks);
+ ServerUtil.acquireLock(region, row, locks);
try {
long maxTimestamp = tr.getMax();
boolean validateOnly = true;
@@ -278,11 +269,11 @@ public class SequenceRegionObserver implements RegionObserver {
}
// update the KeyValues on the server
Mutation[] mutations = new Mutation[]{put};
- region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ region.batchMutate(mutations);
// return a Result with the updated KeyValues
return Result.create(cells);
} finally {
- region.releaseRowLocks(locks);
+ ServerUtil.releaseRowLocks(locks);
}
} catch (Throwable t) {
ServerUtil.throwIOException("Increment of sequence " + Bytes.toStringBinary(row), t);
@@ -378,7 +369,7 @@ public class SequenceRegionObserver implements RegionObserver {
List<RowLock> locks = Lists.newArrayList();
region.startRegionOperation();
try {
- acquireLock(region, row, locks);
+ ServerUtil.acquireLock(region, row, locks);
try {
byte[] family = CellUtil.cloneFamily(keyValue);
byte[] qualifier = CellUtil.cloneQualifier(keyValue);
@@ -428,7 +419,7 @@ public class SequenceRegionObserver implements RegionObserver {
}
}
Mutation[] mutations = new Mutation[]{m};
- region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ region.batchMutate(mutations);
long serverTimestamp = MetaDataUtil.getClientTimeStamp(m);
// Return result with single KeyValue. The only piece of information
// the client cares about is the timestamp, which is the timestamp of
@@ -436,7 +427,7 @@ public class SequenceRegionObserver implements RegionObserver {
return Result.create(Collections.singletonList(
(Cell)KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE)));
} finally {
- region.releaseRowLocks(locks);
+ ServerUtil.releaseRowLocks(locks);
}
} catch (Throwable t) {
ServerUtil.throwIOException("Increment of sequence " + Bytes.toStringBinary(row), t);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a770aa0..e68f95e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -54,12 +54,14 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -252,7 +254,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
// TODO: should we use the one that is all or none?
logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
- region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
+ region.batchMutate(mutations.toArray(mutationArray));
}
private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) {
@@ -269,14 +271,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- private void commitBatchWithHTable(HTable table, List<Mutation> mutations) throws IOException {
+ private void commitBatchWithHTable(Table table, List<Mutation> mutations) throws IOException {
if (mutations.isEmpty()) {
return;
}
logger.debug("Committing batch of " + mutations.size() + " mutations for " + table);
try {
- table.batch(mutations);
+ table.batch(mutations, null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
@@ -412,13 +414,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
byte[] deleteCQ = null;
byte[] deleteCF = null;
byte[] emptyCF = null;
- HTable targetHTable = null;
+ Table targetHTable = null;
boolean isPKChanging = false;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (upsertSelectTable != null) {
isUpsert = true;
projectedTable = deserializeTable(upsertSelectTable);
- targetHTable = new HTable(upsertSelectConfig, projectedTable.getPhysicalName().getBytes());
+ targetHTable =
+ ConnectionFactory.createConnection(upsertSelectConfig).getTable(
+ TableName.valueOf(projectedTable.getPhysicalName().getBytes()));
selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
values = new byte[projectedTable.getPKColumns().size()][];
isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
@@ -457,7 +461,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
MutationList mutations = new MutationList();
boolean needToWrite = false;
Configuration conf = env.getConfiguration();
- long flushSize = region.getTableDesc().getMemStoreFlushSize();
+ long flushSize = region.getTableDescriptor().getMemStoreFlushSize();
if (flushSize <= 0) {
flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
@@ -858,7 +862,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize,
- byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto,
+ byte[] indexMaintainersPtr, byte[] txState, Table targetHTable, boolean useIndexProto,
boolean isPKChanging)
throws IOException {
List<Mutation> localRegionMutations = Lists.newArrayList();
@@ -872,7 +876,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
remoteRegionMutations.clear();
}
- private void separateLocalAndRemoteMutations(HTable targetHTable, Region region, List<Mutation> mutations,
+ private void separateLocalAndRemoteMutations(Table targetHTable, Region region, List<Mutation> mutations,
List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations,
boolean isPKChanging){
boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region);
@@ -894,8 +898,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- private boolean areMutationsInSameTable(HTable targetHTable, Region region) {
- return (targetHTable == null || Bytes.compareTo(targetHTable.getTableName(),
+ private boolean areMutationsInSameTable(Table targetHTable, Region region) {
+ return (targetHTable == null || Bytes.compareTo(targetHTable.getName(),
region.getTableDesc().getTableName().getName()) == 0);
}
@@ -1074,8 +1078,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
- HConstants.NO_NONCE);
+ region.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
uuidValue = ServerCacheClient.generateId();
mutations.clear();
}
@@ -1084,8 +1087,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} while (hasMore);
if (!mutations.isEmpty()) {
- region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
- HConstants.NO_NONCE);
+ region.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
}
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java
index 13a3047..8fd3da5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java
@@ -18,9 +18,10 @@
package org.apache.phoenix.hbase.index;
import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy;
-import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.query.QueryConstants;
@@ -42,29 +43,29 @@ public class IndexRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPol
protected byte[] getSplitPoint() {
byte[] oldSplitPoint = super.getSplitPoint();
if (oldSplitPoint == null) return null;
- List<Store> stores = region.getStores();
+ List<HStore> stores = region.getStores();
byte[] splitPointFromLargestStore = null;
long largestStoreSize = 0;
boolean isLocalIndexKey = false;
- for (Store s : stores) {
- if (s.getFamily().getNameAsString()
+ for (HStore s : stores) {
+ if (s.getColumnFamilyName()
.startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
- byte[] splitPoint = s.getSplitPoint();
- if (oldSplitPoint != null && splitPoint != null
- && Bytes.compareTo(oldSplitPoint, splitPoint) == 0) {
+ Optional<byte[]> splitPoint = s.getSplitPoint();
+ if (oldSplitPoint != null && splitPoint.isPresent()
+ && Bytes.compareTo(oldSplitPoint, splitPoint.get()) == 0) {
isLocalIndexKey = true;
}
}
}
if (!isLocalIndexKey) return oldSplitPoint;
- for (Store s : stores) {
- if (!s.getFamily().getNameAsString()
+ for (HStore s : stores) {
+ if (!s.getColumnFamilyName()
.startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
- byte[] splitPoint = s.getSplitPoint();
+ Optional<byte[]> splitPoint = s.getSplitPoint();
long storeSize = s.getSize();
- if (splitPoint != null && largestStoreSize < storeSize) {
- splitPointFromLargestStore = splitPoint;
+ if (splitPoint.isPresent() && largestStoreSize < storeSize) {
+ splitPointFromLargestStore = splitPoint.get();
largestStoreSize = storeSize;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 1c78fff..5f4afe5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -335,8 +335,7 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
if (!mutations.isEmpty()) {
Region region = e.getEnvironment().getRegion();
// Otherwise, submit the mutations directly here
- region.batchMutate(mutations.toArray(new Mutation[0]), HConstants.NO_NONCE,
- HConstants.NO_NONCE);
+ region.batchMutate(mutations.toArray(new Mutation[0]));
}
return Result.EMPTY_RESULT;
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
index 8f36fd6..c1cce01 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
@@ -203,7 +203,7 @@ class DefaultStatisticsCollector implements StatisticsCollector {
// families when we're collecting stats for a local index.
boolean collectingForLocalIndex = scan != null && !scan.getFamilyMap().isEmpty() && MetaDataUtil.isLocalIndexFamily(scan.getFamilyMap().keySet().iterator().next());
for (Store store : region.getStores()) {
- ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName());
+ ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getColumnFamilyDescriptor().getName());
boolean isLocalIndexStore = MetaDataUtil.isLocalIndexFamily(cfKey);
if (isLocalIndexStore != collectingForLocalIndex) {
continue;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index c3182c5..65bff14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -711,9 +711,7 @@ public class IndexUtil {
m.setDurability(Durability.SKIP_WAL);
}
}
- region.batchMutate(
- mutations.toArray(new Mutation[mutations.size()]),
- HConstants.NO_NONCE, HConstants.NO_NONCE);
+ region.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
}
public static MetaDataMutationResult updateIndexState(String indexTableName, long minTimeStamp,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index a3c8787..1d7678a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.util;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
import org.apache.phoenix.exception.PhoenixIOException;
@@ -234,4 +236,22 @@ public class ServerUtil {
endKey) < 0));
}
+ public static RowLock acquireLock(Region region, byte[] key, List<RowLock> locks)
+ throws IOException {
+ RowLock rowLock = region.getRowLock(key, false);
+ if (rowLock == null) {
+ throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
+ }
+ locks.add(rowLock);
+ return rowLock;
+ }
+
+ public static void releaseRowLocks(List<RowLock> rowLocks) {
+ if (rowLocks != null) {
+ for (RowLock rowLock : rowLocks) {
+ rowLock.release();
+ }
+ rowLocks.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ea90e8a..1ad8f53 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,7 @@
<top.dir>${project.basedir}</top.dir>
<!-- Hadoop Versions -->
- <hbase.version>2.0.0-alpha4-SNAPSHOT</hbase.version>
+ <hbase.version>2.0.0-alpha4</hbase.version>
<hadoop-two.version>2.7.1</hadoop-two.version>
<!-- Dependency versions -->