You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/17 10:28:11 UTC
phoenix git commit: Transactions over mutable indexes mostly working
Repository: phoenix
Updated Branches:
refs/heads/txn 324b566f4 -> 1baa1b6b0
Transactions over mutable indexes mostly working
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1baa1b6b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1baa1b6b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1baa1b6b
Branch: refs/heads/txn
Commit: 1baa1b6b0c85ed06d4892648975d7e51998d75a3
Parents: 324b566
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Apr 17 01:28:12 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Apr 17 01:28:12 2015 -0700
----------------------------------------------------------------------
.../apache/phoenix/compile/DeleteCompiler.java | 2 +-
.../apache/phoenix/compile/UpsertCompiler.java | 4 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 2 +-
.../apache/phoenix/execute/MutationState.java | 140 +++++++++++++++++--
.../apache/phoenix/index/PhoenixIndexCodec.java | 8 ++
.../index/PhoenixTransactionalIndexer.java | 68 ++++-----
.../phoenix/iterate/TableResultIterator.java | 8 +-
.../apache/phoenix/jdbc/PhoenixConnection.java | 90 +++---------
.../apache/phoenix/jdbc/PhoenixStatement.java | 6 +-
.../query/ConnectionQueryServicesImpl.java | 3 +-
10 files changed, 199 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index a0369d5..0778f75 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -494,7 +494,7 @@ public class DeleteCompiler {
ImmutableBytesWritable ptr = context.getTempPtr();
PTable table = tableRef.getTable();
table.getIndexMaintainers(ptr, context.getConnection());
- byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
+ byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getMutationState().getTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
ServerCache cache = null;
try {
if (ptr.getLength() > 0) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index e72b634..67d289e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -163,7 +163,7 @@ public class UpsertCompiler {
if (isAutoCommit && rowCount % batchSize == 0) {
MutationState state = new MutationState(tableRef, mutation, 0, maxSize, connection);
connection.getMutationState().join(state);
- connection.commit();
+ connection.getMutationState().send();
mutation.clear();
}
}
@@ -610,7 +610,7 @@ public class UpsertCompiler {
ImmutableBytesWritable ptr = context.getTempPtr();
PTable table = tableRef.getTable();
table.getIndexMaintainers(ptr, context.getConnection());
- byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
+ byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getMutationState().getTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
ServerCache cache = null;
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 387f23d..0a3035c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -269,7 +269,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
if (dataTable.isTransactional()) {
PhoenixConnection conn = context.getConnection();
- scan.setAttribute(BaseScannerRegionObserver.TX_STATE, TransactionUtil.encodeTxnState(conn.getTransactionContext().getCurrentTransaction()));
+ scan.setAttribute(BaseScannerRegionObserver.TX_STATE, TransactionUtil.encodeTxnState(conn.getMutationState().getTransaction()));
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 59bc6ce..a0cf8d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -28,6 +28,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import co.cask.tephra.Transaction;
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TransactionSystemClient;
import co.cask.tephra.hbase98.TransactionAwareHTable;
import org.apache.hadoop.hbase.HConstants;
@@ -88,17 +93,39 @@ public class MutationState implements SQLCloseable {
// rows - map from rowkey to columns
// columns - map from column to value
private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
+ private final Transaction tx;
+ private final List<TransactionAware> txAwares;
+ private final TransactionContext txContext;
+
private long sizeOffset;
private int numRows = 0;
+ private boolean txStarted = false;
public MutationState(int maxSize, PhoenixConnection connection) {
- this(maxSize,connection,0);
+ this(maxSize,connection,null);
+ }
+
+ public MutationState(int maxSize, PhoenixConnection connection, Transaction tx) {
+ this(maxSize,connection, tx, 0);
}
public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) {
+ this(maxSize, connection, null, sizeOffset);
+ }
+
+ public MutationState(int maxSize, PhoenixConnection connection, Transaction tx, long sizeOffset) {
this.maxSize = maxSize;
this.connection = connection;
this.sizeOffset = sizeOffset;
+ this.tx = tx;
+ if (tx == null) {
+ this.txAwares = Collections.emptyList();
+ TransactionSystemClient txServiceClient = this.connection.getQueryServices().getTransactionSystemClient();
+ this.txContext = new TransactionContext(txServiceClient);
+ } else {
+ txAwares = Lists.newArrayList();
+ txContext = null;
+ }
}
public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
@@ -107,13 +134,19 @@ public class MutationState implements SQLCloseable {
this.mutations.put(table, mutations);
this.sizeOffset = sizeOffset;
this.numRows = mutations.size();
+ this.txAwares = Lists.newArrayList();
+ this.txContext = null;
+ this.tx = connection.getMutationState().getTransaction();
throwIfTooBig();
}
- private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) {
- this.maxSize = maxSize;
- this.connection = connection;
- this.sizeOffset = sizeOffset;
+ private MutationState(MutationState state, List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries) {
+ this.maxSize = state.maxSize;
+ this.connection = state.connection;
+ this.sizeOffset = state.sizeOffset;
+ this.tx = state.tx;
+ this.txAwares = state.txAwares;
+ this.txContext = state.txContext;
for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) {
numRows += entry.getValue().size();
this.mutations.put(entry.getKey(), entry.getValue());
@@ -121,6 +154,35 @@ public class MutationState implements SQLCloseable {
throwIfTooBig();
}
+ private void addTxParticipant(TransactionAware txAware) throws SQLException {
+ if (txContext == null) {
+ txAwares.add(txAware);
+ assert(tx != null);
+ txAware.startTx(tx);
+ } else {
+ txContext.addTransactionAware(txAware);
+ }
+ }
+
+ public Transaction getTransaction() {
+ return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null;
+ }
+
+ public void startTransaction() throws SQLException {
+ if (txContext == null) {
+ throw new SQLException("No transaction context"); // TODO: error code
+ }
+
+ try {
+ if (!txStarted) {
+ txContext.start();
+ txStarted = true;
+ }
+ } catch (TransactionFailureException e) {
+ throw new SQLException(e); // TODO: error code
+ }
+ }
+
private void throwIfTooBig() {
if (numRows > maxSize) {
// TODO: throw SQLException ?
@@ -141,6 +203,15 @@ public class MutationState implements SQLCloseable {
if (this == newMutation) { // Doesn't make sense
return;
}
+ // TODO: what if new and old have txContext as that's really an error
+ // Really it's an error if newMutation txContext is not null
+ if (txContext != null) {
+ for (TransactionAware txAware : txAwares) {
+ txContext.addTransactionAware(txAware);
+ }
+ } else {
+ txAwares.addAll(newMutation.txAwares);
+ }
this.sizeOffset += newMutation.sizeOffset;
// Merge newMutation with this one, keeping state from newMutation for any overlaps
for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) {
@@ -394,7 +465,10 @@ public class MutationState implements SQLCloseable {
if (hasIndexMaintainers && isDataTable) {
byte[] attribValue = null;
byte[] uuidValue;
- byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
+ byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
+ if (table.isTransactional()) {
+ txState = TransactionUtil.encodeTxnState(getTransaction());
+ }
if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength() + txState.length)) {
IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
cache = client.addIndexMetadataCache(mutations, tempPtr, txState);
@@ -421,15 +495,17 @@ public class MutationState implements SQLCloseable {
}
}
}
-
+
SQLException sqlE = null;
HTableInterface hTable = connection.getQueryServices().getTable(htableName);
try {
- // Don't add immutable indexes (those are the only ones that would participate
- // during a commit), as we don't need conflict detection for these.
- if (table.isTransactional() && isDataTable) {
+ if (table.isTransactional()) {
TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable);
- connection.addTxParticipant(txnAware);
+ // Don't add immutable indexes (those are the only ones that would participate
+ // during a commit), as we don't need conflict detection for these.
+ if (isDataTable) {
+ addTxParticipant(txnAware);
+ }
hTable = txnAware;
}
logMutationSize(hTable, mutations, connection);
@@ -465,7 +541,7 @@ public class MutationState implements SQLCloseable {
}
// Throw to client with both what was committed so far and what is left to be committed.
// That way, client can either undo what was done or try again with what was not done.
- sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection));
+ sqlE = new CommitException(e, this, new MutationState(this, committedList));
} finally {
try {
if (cache != null) {
@@ -508,4 +584,44 @@ public class MutationState implements SQLCloseable {
@Override
public void close() throws SQLException {
}
+
+ public void rollback() throws SQLException {
+ clear();
+ txAwares.clear();
+ if (txContext != null) {
+ try {
+ if (txStarted) {
+ txContext.abort();
+ }
+ } catch (TransactionFailureException e) {
+ throw new SQLException(e); // TODO: error code
+ } finally {
+ txStarted = false;
+ }
+ }
+ }
+
+ public void commit() throws SQLException {
+ try {
+ send();
+ } finally {
+ txAwares.clear();
+ if (txContext != null) {
+ try {
+ if (txStarted) {
+ txContext.finish();
+ }
+ } catch (TransactionFailureException e) {
+ try {
+ txContext.abort(e);
+ throw TransactionUtil.getSQLException(e);
+ } catch (TransactionFailureException e1) {
+ throw TransactionUtil.getSQLException(e);
+ }
+ } finally {
+ txStarted = false;
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 1fe9931..36b849d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -87,6 +87,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
indexUpdate.setTable(maintainer.getIndexTableName());
Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
.getRegion().getStartKey(), env.getRegion().getEndKey());
+ if (put == null) {
+ throw new IllegalStateException("Null put for " + env.getRegion().getRegionInfo().getTable().getNameAsString()
+ + ": " + Bytes.toStringBinary(ptr.get(), ptr.getOffset(), ptr.getLength()));
+ }
indexUpdate.setUpdate(put);
indexUpdates.add(indexUpdate);
}
@@ -112,6 +116,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
indexUpdate.setTable(maintainer.getIndexTableName());
Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
+ if (delete == null) {
+ throw new IllegalStateException("Null put for " + env.getRegion().getRegionInfo().getTable().getNameAsString()
+ + ": " + Bytes.toStringBinary(ptr.get(), ptr.getOffset(), ptr.getLength()));
+ }
indexUpdate.setUpdate(delete);
indexUpdates.add(indexUpdate);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index f6e6806..adba507 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -187,13 +187,13 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
if (scanner != null) {
Result result;
while ((result = scanner.next()) != null) {
- TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), result);
+ Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
+ TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m, result);
Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
for (IndexUpdate delete : deletes) {
indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
}
- Mutation m = mutations.get(new ImmutableBytesPtr(result.getRow()));
- state.applyMutation(m);
+ state.applyMutation();
Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
for (IndexUpdate update : updates) {
indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
@@ -202,7 +202,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
for (Mutation m : mutations.values()) {
TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m);
- state.applyMutation(m);
+ state.applyMutation();
Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
for (IndexUpdate update : updates) {
indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
@@ -217,7 +217,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
private static class TxTableState implements TableState {
- private final byte[] rowKey;
+ private final Mutation mutation;
private final long currentTimestamp;
private final RegionCoprocessorEnvironment env;
private final Map<String, byte[]> attributes;
@@ -225,24 +225,28 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
private final Set<ColumnReference> indexedColumns;
private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
- private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, byte[] rowKey) {
+ private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation mutation) {
this.env = env;
this.currentTimestamp = currentTimestamp;
this.indexedColumns = indexedColumns;
this.attributes = attributes;
- this.rowKey = rowKey;
+ this.mutation = mutation;
int estimatedSize = indexedColumns.size();
this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
+ try {
+ CellScanner scanner = mutation.cellScanner();
+ while (scanner.advance()) {
+ Cell cell = scanner.current();
+ pendingUpdates.add(cell);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
}
- public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m) {
- this(env, indexedColumns, attributes, currentTimestamp, m.getRow());
- applyMutation(m);
- }
-
- public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Result r) {
- this(env, indexedColumns, attributes, currentTimestamp, r.getRow());
+ public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m, Result r) {
+ this(env, indexedColumns, attributes, currentTimestamp, m);
for (ColumnReference ref : indexedColumns) {
Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
@@ -271,7 +275,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
@Override
public byte[] getCurrentRowKey() {
- return rowKey;
+ return mutation.getRow();
}
@Override
@@ -279,32 +283,28 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
return Collections.emptyList();
}
- public void applyMutation(Mutation m) {
- if (m instanceof Delete) {
+ public void applyMutation() {
+ if (mutation instanceof Delete) {
valueMap.clear();
} else {
- CellScanner scanner = m.cellScanner();
- try {
- while (scanner.advance()) {
- Cell cell = scanner.current();
- if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
- ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- valueMap.remove(ref);
- } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
- for (ColumnReference ref : indexedColumns) {
- if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
- valueMap.remove(ref);
- }
+ for (Cell cell : pendingUpdates) {
+ if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+ ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ valueMap.remove(ref);
+ } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
+ for (ColumnReference ref : indexedColumns) {
+ if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
+ valueMap.remove(ref);
}
- } else {
- ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ }
+ } else {
+ ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ if (indexedColumns.contains(ref)) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
valueMap.put(ref, ptr);
}
}
- } catch (IOException e) {
- throw new RuntimeException(e); // Impossible
}
}
}
@@ -328,7 +328,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
@Override
public byte[] getRowKey() {
- return rowKey;
+ return mutation.getRow();
}
};
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 7f5d527..9cece1c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
+import co.cask.tephra.Transaction;
import co.cask.tephra.hbase98.TransactionAwareHTable;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -87,9 +88,10 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
PTable table = tableRef.getTable();
HTableInterface htable = context.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
if (table.isTransactional()) {
- TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(htable);
- context.getConnection().addTxParticipant(txnAware);
- htable = txnAware;
+ TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable);
+ Transaction tx = context.getConnection().getMutationState().getTransaction();
+ txAware.startTx(tx);
+ htable = txAware;
}
this.htable = htable;
if (creationMode == ScannerCreation.IMMEDIATE) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 240a599..d513362 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -54,10 +54,7 @@ import java.util.concurrent.Executor;
import javax.annotation.Nullable;
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.Transaction;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.call.CallRunner;
@@ -97,7 +94,6 @@ import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
-import org.apache.phoenix.util.TransactionUtil;
import org.cloudera.htrace.Sampler;
import org.cloudera.htrace.TraceScope;
@@ -139,7 +135,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
private final String timestampPattern;
private TraceScope traceScope = null;
- private TransactionContext txContext;
private boolean isClosed = false;
private Sampler<?> sampler;
private boolean readOnly = false;
@@ -156,23 +151,22 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
}
public PhoenixConnection(PhoenixConnection connection) throws SQLException {
- this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache());
+ this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), connection.getMutationState().getTransaction());
this.isAutoCommit = connection.isAutoCommit;
this.sampler = connection.sampler;
}
- public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException {
- this(connection.getQueryServices(), connection, scn);
- this.sampler = connection.sampler;
- }
-
public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException {
- this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache());
+ this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache(), connection.getMutationState().getTransaction());
this.isAutoCommit = connection.isAutoCommit;
this.sampler = connection.sampler;
}
public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException {
+ this(services, url, info, metaData, null);
+ }
+
+ public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, Transaction txn) throws SQLException {
this.url = url;
// Copy so client cannot change
this.info = info == null ? new Properties() : PropertiesUtil.deepCopy(info);
@@ -242,7 +236,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
}
});
- this.mutationState = new MutationState(maxSize, this);
+ this.mutationState = new MutationState(maxSize, this, txn);
this.services.addConnection(this);
// setup tracing, if its enabled
@@ -250,10 +244,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
this.customTracingAnnotations = getImmutableCustomTracingAnnotations();
}
- public TransactionContext getTransactionContext() {
- return txContext;
- }
-
private ImmutableMap<String, String> getImmutableCustomTracingAnnotations() {
Builder<String, String> result = ImmutableMap.builder();
result.putAll(JDBCUtil.getAnnotations(url, info));
@@ -437,60 +427,15 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
}
}
- public void startTransaction() throws SQLException {
- if (txContext == null) {
- boolean success = false;
- try {
- TransactionSystemClient txServiceClient = this.getQueryServices().getTransactionSystemClient();
- this.txContext = new TransactionContext(txServiceClient);
- txContext.start();
- success = true;
- } catch (TransactionFailureException e) {
- throw new SQLException(e); // TODO: error code
- } finally {
- if (!success) endTransaction();
- }
- }
- }
-
- public void addTxParticipant(TransactionAware txnAware) throws SQLException {
- if (!isTransactionStarted()) {
- startTransaction();
- }
- txContext.addTransactionAware(txnAware);
- }
-
- private boolean isTransactionStarted() {
- return txContext != null;
- }
-
- private void endTransaction() {
- txContext = null;
- }
-
@Override
public void commit() throws SQLException {
CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() {
@Override
public Void call() throws SQLException {
- mutationState.send();
- if (isTransactionStarted()) {
- try {
- txContext.finish();
- } catch (TransactionFailureException e) {
- try {
- txContext.abort(e);
- throw TransactionUtil.getSQLException(e);
- } catch (TransactionFailureException e1) {
- throw TransactionUtil.getSQLException(e);
- }
- } finally {
- endTransaction();
- }
- }
+ mutationState.commit();
return null;
}
- }, Tracing.withTracing(this, "sending mutations"));
+ }, Tracing.withTracing(this, "committing"));
}
@Override
@@ -690,16 +635,13 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
@Override
public void rollback() throws SQLException {
- mutationState.clear();
- if (isTransactionStarted()) {
- try {
- txContext.abort();
- } catch (TransactionFailureException e) {
- throw new SQLException(e); // TODO: error code
- } finally {
- endTransaction();
+ CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() {
+ @Override
+ public Void call() throws SQLException {
+ mutationState.rollback();
+ return null;
}
- }
+ }, Tracing.withTracing(this, "rolling back"));
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 3ccc772..fb295d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -234,9 +234,9 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
final long startTime = System.currentTimeMillis();
try {
QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ startTransaction(plan);
plan = connection.getQueryServices().getOptimizer().optimize(
PhoenixStatement.this, plan);
- startTransaction(plan);
// this will create its own trace internally, so we don't wrap this
// whole thing in tracing
ResultIterator resultIterator = plan.iterator();
@@ -279,10 +279,10 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
}
}
- private void startTransaction(StatementPlan plan) throws SQLException {
+ public void startTransaction(StatementPlan plan) throws SQLException {
for (TableRef ref : plan.getContext().getResolver().getTables()) {
if (ref.getTable().isTransactional()) {
- connection.startTransaction();
+ connection.getMutationState().startTransaction();
break;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 37a28fb..01cf8b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -137,7 +137,6 @@ import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableProperty;
-import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.PTableStats;
import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.schema.types.PBoolean;
@@ -2070,7 +2069,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public MutationState updateData(MutationPlan plan) throws SQLException {
- TableRef currentTable = plan.getContext().getCurrentTable();
+ plan.getContext().getStatement().startTransaction(plan);
return plan.execute();
}