You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2018/07/25 22:34:59 UTC
[21/50] [abbrv] phoenix git commit: PHOENIX-4773 Move HTable rollback
wrapper into Tephra TAL method
PHOENIX-4773 Move HTable rollback wrapper into Tephra TAL method
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/32154dfe
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/32154dfe
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/32154dfe
Branch: refs/heads/4.x-HBase-1.4
Commit: 32154dfe01e769b5bc84e85c8de481abf9c1a2cc
Parents: d900771
Author: James Taylor <ja...@apache.org>
Authored: Mon Jun 4 20:27:36 2018 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Jun 5 22:03:56 2018 -0700
----------------------------------------------------------------------
.../apache/phoenix/cache/ServerCacheClient.java | 21 +-
.../apache/phoenix/execute/HashJoinPlan.java | 7 +-
.../apache/phoenix/execute/MutationState.java | 190 ++-----------------
.../PhoenixTxIndexMutationGenerator.java | 42 ++++
.../phoenix/index/IndexMetaDataCacheClient.java | 67 ++++++-
.../apache/phoenix/join/HashCacheClient.java | 5 +-
.../transaction/OmidTransactionContext.java | 3 +-
.../transaction/PhoenixTransactionContext.java | 5 +-
.../transaction/TephraTransactionContext.java | 91 ++++++++-
.../java/org/apache/phoenix/util/IndexUtil.java | 8 +
10 files changed, 230 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/32154dfe/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 68de747..5e284bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -70,7 +70,6 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
@@ -90,7 +89,7 @@ public class ServerCacheClient {
private static final Random RANDOM = new Random();
public static final String HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER = "hash.join.server.cache.resend.per.server";
private final PhoenixConnection connection;
- private final Map<Integer, TableRef> cacheUsingTableRefMap = new ConcurrentHashMap<Integer, TableRef>();
+ private final Map<Integer, PTable> cacheUsingTableMap = new ConcurrentHashMap<Integer, PTable>();
/**
* Construct client used to create a serialized cached snapshot of a table and send it to each region server
@@ -220,12 +219,12 @@ public class ServerCacheClient {
}
public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
- final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
- return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTableRef, false);
+ final ServerCacheFactory cacheFactory, final PTable cacheUsingTable) throws SQLException {
+ return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false);
}
public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
- final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef, boolean storeCacheOnClient)
+ final ServerCacheFactory cacheFactory, final PTable cacheUsingTable, boolean storeCacheOnClient)
throws SQLException {
ConnectionQueryServices services = connection.getQueryServices();
List<Closeable> closeables = new ArrayList<Closeable>();
@@ -241,7 +240,6 @@ public class ServerCacheClient {
ExecutorService executor = services.getExecutor();
List<Future<Boolean>> futures = Collections.emptyList();
try {
- final PTable cacheUsingTable = cacheUsingTableRef.getTable();
List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes());
int nRegions = locations.size();
// Size these based on worst case
@@ -258,7 +256,7 @@ public class ServerCacheClient {
servers.add(entry);
if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, connection));}
final byte[] key = getKeyInRegion(entry.getRegionInfo().getStartKey());
- final HTableInterface htable = services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
+ final HTableInterface htable = services.getTable(cacheUsingTable.getPhysicalName().getBytes());
closeables.add(htable);
futures.add(executor.submit(new JobCallable<Boolean>() {
@@ -294,7 +292,7 @@ public class ServerCacheClient {
future.get(timeoutMs, TimeUnit.MILLISECONDS);
}
- cacheUsingTableRefMap.put(Bytes.mapKey(cacheId), cacheUsingTableRef);
+ cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
success = true;
} catch (SQLException e) {
firstException = e;
@@ -337,9 +335,8 @@ public class ServerCacheClient {
try {
ConnectionQueryServices services = connection.getQueryServices();
Throwable lastThrowable = null;
- TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
- final PTable cacheUsingTable = cacheUsingTableRef.getTable();
- byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
+ final PTable cacheUsingTable = cacheUsingTableMap.get(Bytes.mapKey(cacheId));
+ byte[] tableName = cacheUsingTable.getPhysicalName().getBytes();
iterateOverTable = services.getTable(tableName);
List<HRegionLocation> locations = services.getAllTableRegions(tableName);
@@ -403,7 +400,7 @@ public class ServerCacheClient {
lastThrowable);
}
} finally {
- cacheUsingTableRefMap.remove(cacheId);
+ cacheUsingTableMap.remove(cacheId);
Closeables.closeQuietly(iterateOverTable);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/32154dfe/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 5b433b3..bfe089d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -66,7 +66,10 @@ import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.optimize.Cost;
-import org.apache.phoenix.parse.*;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -494,7 +497,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
cache =
parent.hashClient.addHashCache(ranges, iterator,
plan.getEstimatedSize(), hashExpressions, singleValueOnly,
- parent.delegate.getTableRef(), keyRangeRhsExpression,
+ parent.delegate.getTableRef().getTable(), keyRangeRhsExpression,
keyRangeRhsValues);
long endTime = System.currentTimeMillis();
boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/32154dfe/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 2e795b1..c29d6b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -38,7 +38,6 @@ import javax.annotation.Nonnull;
import javax.annotation.concurrent.Immutable;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -51,7 +50,6 @@ import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -60,10 +58,8 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexBuilder;
-import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
-import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
@@ -94,14 +90,10 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
-import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SQLCloseable;
-import org.apache.phoenix.util.SQLCloseables;
-import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.SizedUtil;
import org.apache.phoenix.util.TransactionUtil;
@@ -499,19 +491,13 @@ public class MutationState implements SQLCloseable {
return ptr;
}
- private static List<PTable> getClientMaintainedIndexes(PTable table) {
- Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this client-side mechanism
- (table.isImmutableRows() || table.isTransactional()) ? IndexMaintainer.maintainedGlobalIndexes(table
- .getIndexes().iterator()) : Collections.<PTable> emptyIterator();
- return Lists.newArrayList(indexIterator);
- }
-
private Iterator<Pair<PName, List<Mutation>>> addRowMutations(final TableRef tableRef,
final MultiRowMutationState values, final long mutationTimestamp, final long serverTimestamp,
boolean includeAllIndexes, final boolean sendAll) {
final PTable table = tableRef.getTable();
- final List<PTable> indexList = includeAllIndexes ? Lists.newArrayList(IndexMaintainer.maintainedIndexes(table
- .getIndexes().iterator())) : getClientMaintainedIndexes(table);
+ final List<PTable> indexList = includeAllIndexes ?
+ Lists.newArrayList(IndexMaintainer.maintainedIndexes(table.getIndexes().iterator())) :
+ IndexUtil.getClientMaintainedIndexes(table);
final Iterator<PTable> indexes = indexList.iterator();
final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size());
final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists
@@ -541,7 +527,7 @@ public class MutationState implements SQLCloseable {
if (!mutationsPertainingToIndex.isEmpty()) {
if (table.isTransactional()) {
if (indexMutationsMap == null) {
- PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table,
+ PhoenixTxIndexMutationGenerator generator = PhoenixTxIndexMutationGenerator.newGenerator(connection, table,
indexList, mutationsPertainingToIndex.get(0).getAttributesMap());
try (HTableInterface htable = connection.getQueryServices().getTable(
table.getPhysicalName().getBytes())) {
@@ -596,43 +582,6 @@ public class MutationState implements SQLCloseable {
};
}
- private PhoenixTxIndexMutationGenerator newTxIndexMutationGenerator(PTable table, List<PTable> indexes,
- Map<String, byte[]> attributes) {
- final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size());
- for (PTable index : indexes) {
- IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
- indexMaintainers.add(maintainer);
- }
- IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() {
-
- @Override
- public void close() throws IOException {}
-
- @Override
- public List<IndexMaintainer> getIndexMaintainers() {
- return indexMaintainers;
- }
-
- @Override
- public PhoenixTransactionContext getTransactionContext() {
- return phoenixTransactionContext.newTransactionContext(phoenixTransactionContext, true);
- }
-
- @Override
- public int getClientVersion() {
- return MetaDataProtocol.PHOENIX_VERSION;
- }
-
- };
- try {
- PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes);
- return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData,
- table.getPhysicalName().getBytes());
- } catch (IOException e) {
- throw new RuntimeException(e); // Impossible
- }
- }
-
private void generateMutations(final TableRef tableRef, final long mutationTimestamp, final long serverTimestamp,
final MultiRowMutationState values, final List<Mutation> mutationList,
final List<Mutation> mutationsPertainingToIndex) {
@@ -793,7 +742,6 @@ public class MutationState implements SQLCloseable {
private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap)
throws SQLException {
- Long scn = connection.getSCN();
MetaDataClient client = new MetaDataClient(connection);
long serverTimeStamp = tableRef.getTimeStamp();
// If we're auto committing, we've already validated the schema when we got the ColumnResolver,
@@ -862,65 +810,6 @@ public class MutationState implements SQLCloseable {
return batchCount;
}
- private class MetaDataAwareHTable extends DelegateHTable {
- private final TableRef tableRef;
-
- private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) {
- super(delegate);
- this.tableRef = tableRef;
- }
-
- /**
- * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an opportunity to attach
- * our index meta data to the mutations such that we can also undo the index mutations.
- */
- @Override
- public void delete(List<Delete> deletes) throws IOException {
- ServerCache cache = null;
- try {
- if (deletes.isEmpty()) { return; }
- // Attach meta data for server maintained indexes
- PTable table = tableRef.getTable();
- ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
- if (table.getIndexMaintainers(indexMetaDataPtr, connection)) {
- cache = setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr);
- }
-
- // Send deletes for client maintained indexes
- List<PTable> indexes = getClientMaintainedIndexes(table);
- if (!indexes.isEmpty()) {
- PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, indexes, deletes.get(0)
- .getAttributesMap());
- Collection<Pair<Mutation, byte[]>> indexUpdates = generator.getIndexUpdates(delegate,
- deletes.iterator());
- for (PTable index : indexes) {
- byte[] physicalName = index.getPhysicalName().getBytes();
- try (HTableInterface hindex = connection.getQueryServices().getTable(physicalName)) {
- List<Mutation> indexDeletes = Lists.newArrayListWithExpectedSize(deletes.size());
- for (Pair<Mutation, byte[]> mutationPair : indexUpdates) {
- if (Bytes.equals(mutationPair.getSecond(), physicalName)) {
- indexDeletes.add(mutationPair.getFirst());
- }
- hindex.batch(indexDeletes);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
- }
- }
-
- delegate.delete(deletes);
- } catch (SQLException e) {
- throw new IOException(e);
- } finally {
- if (cache != null) {
- SQLCloseables.closeAllQuietly(Collections.singletonList(cache));
- }
- }
- }
- }
-
private static class TableInfo {
private final boolean isDataTable;
@@ -1059,8 +948,9 @@ public class MutationState implements SQLCloseable {
TableRef origTableRef = tableInfo.getOrigTableRef();
PTable table = origTableRef.getTable();
table.getIndexMaintainers(indexMetaDataPtr, connection);
- final ServerCache cache = tableInfo.isDataTable() ? setMetaDataOnMutations(origTableRef,
- mutationList, indexMetaDataPtr) : null;
+ final ServerCache cache = tableInfo.isDataTable() ?
+ IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
+ mutationList, indexMetaDataPtr) : null;
// If we haven't retried yet, retry for this case only, as it's possible that
// a split will occur after we send the index metadata cache to all known
// region servers.
@@ -1070,19 +960,12 @@ public class MutationState implements SQLCloseable {
try {
if (table.isTransactional()) {
// Track tables to which we've sent uncommitted data
- uncommittedPhysicalNames.add(table.getPhysicalName().getString());
- phoenixTransactionContext.markDMLFence(table);
-
- // If we have indexes, wrap the HTable in a delegate HTable that
- // will attach the necessary index meta data in the event of a
- // rollback
- if (!table.getIndexes().isEmpty()) {
- hTable = new MetaDataAwareHTable(hTable, origTableRef);
+ if (tableInfo.isDataTable()) {
+ uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+ phoenixTransactionContext.markDMLFence(table);
}
-
- hTable = phoenixTransactionContext.getTransactionalTableWriter(hTable, table);
+ hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, !tableInfo.isDataTable());
}
-
numMutations = mutationList.size();
GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
mutationSizeBytes = calculateMutationSize(mutationList);
@@ -1236,57 +1119,6 @@ public class MutationState implements SQLCloseable {
return phoenixTransactionContext.encodeTransaction();
}
- private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations,
- ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
- PTable table = tableRef.getTable();
- final byte[] tenantIdBytes;
- if (table.isMultiTenant()) {
- tenantIdBytes = connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes(
- table.getRowKeySchema(), table.getBucketNum() != null, connection.getTenantId(),
- table.getViewIndexId() != null);
- } else {
- tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
- }
- ServerCache cache = null;
- byte[] attribValue = null;
- byte[] uuidValue = null;
- byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
- if (table.isTransactional()) {
- txState = encodeTransaction();
- }
- boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
- if (hasIndexMetaData) {
- if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength()
- + txState.length)) {
- IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
- uuidValue = cache.getId();
- } else {
- attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
- uuidValue = ServerCacheClient.generateId();
- }
- } else if (txState.length == 0) { return null; }
- // Either set the UUID to be able to access the index metadata from the cache
- // or set the index metadata directly on the Mutation
- for (Mutation mutation : mutations) {
- if (connection.getTenantId() != null) {
- mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
- }
- mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- if (attribValue != null) {
- mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
- mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION,
- Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
- if (txState.length > 0) {
- mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
- }
- } else if (!hasIndexMetaData && txState.length > 0) {
- mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
- }
- }
- return cache;
- }
-
private void addUncommittedStatementIndexes(Collection<RowMutationState> rowMutations) {
for (RowMutationState rowMutationState : rowMutations) {
uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/32154dfe/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index 0016fa9..a7b5687 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -45,7 +45,9 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.IndexMetaDataCache;
import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.MultiMutation;
import org.apache.phoenix.hbase.index.ValueGetter;
@@ -59,7 +61,9 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
@@ -440,4 +444,42 @@ public class PhoenixTxIndexMutationGenerator {
return pair;
}
}
+
+ public static PhoenixTxIndexMutationGenerator newGenerator(final PhoenixConnection connection, PTable table, List<PTable> indexes,
+ Map<String, byte[]> attributes) {
+ final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size());
+ for (PTable index : indexes) {
+ IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
+ indexMaintainers.add(maintainer);
+ }
+ IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() {
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public List<IndexMaintainer> getIndexMaintainers() {
+ return indexMaintainers;
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext() {
+ PhoenixTransactionContext context = connection.getMutationState().getPhoenixTransactionContext();
+ return context.newTransactionContext(context, true);
+ }
+
+ @Override
+ public int getClientVersion() {
+ return MetaDataProtocol.PHOENIX_VERSION;
+ }
+
+ };
+ try {
+ PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes);
+ return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData,
+ table.getPhysicalName().getBytes());
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/32154dfe/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index fcabdfd..bd308c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -24,20 +24,25 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.join.MaxServerCacheSizeExceededException;
import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.ScanUtil;
public class IndexMetaDataCacheClient {
private final ServerCacheClient serverCache;
- private TableRef cacheUsingTableRef;
+ private PTable cacheUsingTable;
/**
* Construct client used to send index metadata to each region server
@@ -45,9 +50,9 @@ public class IndexMetaDataCacheClient {
* @param connection the client connection
* @param cacheUsingTableRef table ref to table that will use the cache during its scan
*/
- public IndexMetaDataCacheClient(PhoenixConnection connection, TableRef cacheUsingTableRef) {
+ public IndexMetaDataCacheClient(PhoenixConnection connection, PTable cacheUsingTable) {
serverCache = new ServerCacheClient(connection);
- this.cacheUsingTableRef = cacheUsingTableRef;
+ this.cacheUsingTable = cacheUsingTable;
}
/**
@@ -75,7 +80,7 @@ public class IndexMetaDataCacheClient {
/**
* Serialize and compress hashCacheTable
*/
- return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
+ return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTable);
}
@@ -91,7 +96,55 @@ public class IndexMetaDataCacheClient {
/**
* Serialize and compress hashCacheTable
*/
- return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
+ return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTable);
+ }
+
+ public static ServerCache setMetaDataOnMutations(PhoenixConnection connection, PTable table, List<? extends Mutation> mutations,
+ ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
+ final byte[] tenantIdBytes;
+ if (table.isMultiTenant()) {
+ tenantIdBytes = connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes(
+ table.getRowKeySchema(), table.getBucketNum() != null, connection.getTenantId(),
+ table.getViewIndexId() != null);
+ } else {
+ tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
+ }
+ ServerCache cache = null;
+ byte[] attribValue = null;
+ byte[] uuidValue = null;
+ byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
+ if (table.isTransactional()) {
+ txState = connection.getMutationState().encodeTransaction();
+ }
+ boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
+ if (hasIndexMetaData) {
+ if (useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) {
+ IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, table);
+ cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
+ uuidValue = cache.getId();
+ } else {
+ attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+ uuidValue = ServerCacheClient.generateId();
+ }
+ } else if (txState.length == 0) { return null; }
+ // Either set the UUID to be able to access the index metadata from the cache
+ // or set the index metadata directly on the Mutation
+ for (Mutation mutation : mutations) {
+ if (connection.getTenantId() != null) {
+ mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
+ }
+ mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ if (attribValue != null) {
+ mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
+ mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION,
+ Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
+ if (txState.length > 0) {
+ mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
+ } else if (!hasIndexMetaData && txState.length > 0) {
+ mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
+ }
+ return cache;
}
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/32154dfe/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index 2ec509c..83ac32d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -37,7 +37,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.ByteUtil;
@@ -77,13 +76,13 @@ public class HashCacheClient {
* @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
* size
*/
- public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException {
+ public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, PTable cacheUsingTable, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException {
/**
* Serialize and compress hashCacheTable
*/
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
- ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef, true);
+ ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTable, true);
return cache;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/32154dfe/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index ab9e8a6..d235d4b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.transaction;
import java.sql.SQLException;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
@@ -132,7 +133,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
}
@Override
- public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table) {
+ public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) {
// TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/32154dfe/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index 751945a..dfa35be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
@@ -110,7 +111,7 @@ public interface PhoenixTransactionContext {
}
@Override
- public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table) {
+ public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) {
return null;
}
};
@@ -230,5 +231,5 @@ public interface PhoenixTransactionContext {
public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask);
public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable);
- public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table);
+ public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/32154dfe/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index bc33cff..5b3c9b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -20,20 +20,31 @@ package org.apache.phoenix.transaction;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.DelegateHTable;
+import org.apache.phoenix.execute.PhoenixTxIndexMutationGenerator;
+import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.transaction.TephraTransactionProvider.TephraTransactionClient;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.SQLCloseables;
import org.apache.tephra.Transaction;
import org.apache.tephra.Transaction.VisibilityLevel;
import org.apache.tephra.TransactionAware;
@@ -399,14 +410,19 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
}
@Override
- public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table) {
- boolean isIndex = table.getType() == PTableType.INDEX;
- TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(htable, table.isImmutableRows() || isIndex ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
+ public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) throws SQLException {
+ // If we have indexes, wrap the HTable in a delegate HTable that
+ // will attach the necessary index meta data in the event of a
+ // rollback
+ TransactionAwareHTable transactionAwareHTable;
// Don't add immutable indexes (those are the only ones that would participate
// during a commit), as we don't need conflict detection for these.
if (isIndex) {
+ transactionAwareHTable = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.NONE);
transactionAwareHTable.startTx(getTransaction());
} else {
+ htable = new RollbackHookHTableWrapper(htable, table, connection);
+ transactionAwareHTable = new TransactionAwareHTable(htable, table.isImmutableRows() ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
// Even for immutable, we need to do this so that an abort has the state
// necessary to generate the rows to delete.
this.addTransactionAware(transactionAwareHTable);
@@ -414,4 +430,73 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
return transactionAwareHTable;
}
+ /**
+ *
+ * Wraps Tephra data table HTables to catch when a rollback occurs so
+ * that index Delete mutations can be generated and applied (as
+ * opposed to storing them in the Tephra change set). This technique
+ * allows the Tephra API to be used directly with HBase APIs and
+ * Phoenix APIs since we can detect the rollback as a callback
+ * when the Tephra rollback is called.
+ *
+ */
+ private static class RollbackHookHTableWrapper extends DelegateHTable {
+ private final PTable table;
+ private final PhoenixConnection connection;
+
+ private RollbackHookHTableWrapper(HTableInterface delegate, PTable table, PhoenixConnection connection) {
+ super(delegate);
+ this.table = table;
+ this.connection = connection;
+ }
+
+ /**
+ * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an opportunity to attach
+ * our index meta data to the mutations such that we can also undo the index mutations.
+ */
+ @Override
+ public void delete(List<Delete> deletes) throws IOException {
+ ServerCache cache = null;
+ try {
+ if (deletes.isEmpty()) { return; }
+ // Attach meta data for server maintained indexes
+ ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
+ if (table.getIndexMaintainers(indexMetaDataPtr, connection)) {
+ cache = IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table, deletes, indexMetaDataPtr);
+ }
+
+ // Send deletes for client maintained indexes
+ List<PTable> indexes = IndexUtil.getClientMaintainedIndexes(table);
+ if (!indexes.isEmpty()) {
+ PhoenixTxIndexMutationGenerator generator = PhoenixTxIndexMutationGenerator.newGenerator(connection, table, indexes, deletes.get(0)
+ .getAttributesMap());
+ Collection<Pair<Mutation, byte[]>> indexUpdates = generator.getIndexUpdates(delegate,
+ deletes.iterator());
+ for (PTable index : indexes) {
+ byte[] physicalName = index.getPhysicalName().getBytes();
+ try (HTableInterface hindex = connection.getQueryServices().getTable(physicalName)) {
+ List<Mutation> indexDeletes = Lists.newArrayListWithExpectedSize(deletes.size());
+ for (Pair<Mutation, byte[]> mutationPair : indexUpdates) {
+ if (Bytes.equals(mutationPair.getSecond(), physicalName)) {
+ indexDeletes.add(mutationPair.getFirst());
+ }
+ hindex.batch(indexDeletes);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+ }
+
+ delegate.delete(deletes);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ if (cache != null) {
+ SQLCloseables.closeAllQuietly(Collections.singletonList(cache));
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/32154dfe/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 78a68d2..3fe5438 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -32,6 +32,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@@ -816,5 +817,12 @@ public class IndexUtil {
.setTableName(indexName).build().buildException();
}
}
+
+ public static List<PTable> getClientMaintainedIndexes(PTable table) {
+ Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this client-side mechanism
+ (table.isImmutableRows() || table.isTransactional()) ? IndexMaintainer.maintainedGlobalIndexes(table
+ .getIndexes().iterator()) : Collections.<PTable> emptyIterator();
+ return Lists.newArrayList(indexIterator);
+ }
}