You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/20 08:12:56 UTC
phoenix git commit: Fix TupleProjector to call getTableName instead
of getName, make equality of TableRef ignore alias if either is null,
cache Transaction on context for querying
Repository: phoenix
Updated Branches:
refs/heads/txn 433495482 -> 714284d32
Fix TupleProjector to call getTableName instead of getName, make equality of TableRef ignore alias if either is null, cache Transaction on context for querying
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/714284d3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/714284d3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/714284d3
Branch: refs/heads/txn
Commit: 714284d32a4937015e97e6efdb0556b5990313f2
Parents: 4334954
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Apr 19 23:12:50 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Apr 19 23:12:50 2015 -0700
----------------------------------------------------------------------
.../phoenix/transactions/TransactionIT.java | 59 ++++++--
.../phoenix/compile/StatementContext.java | 11 ++
.../compile/TupleProjectionCompiler.java | 2 +-
.../apache/phoenix/compile/UpsertCompiler.java | 9 ++
.../apache/phoenix/execute/CommitException.java | 9 +-
.../apache/phoenix/execute/MutationState.java | 138 +++++++++++--------
.../phoenix/iterate/TableResultIterator.java | 6 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 27 ++--
.../query/ConnectionQueryServicesImpl.java | 1 -
.../org/apache/phoenix/schema/TableRef.java | 8 +-
.../java/org/apache/phoenix/query/BaseTest.java | 2 -
11 files changed, 171 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
index 807234d..31adcb9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
@@ -25,15 +25,11 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
+import co.cask.tephra.TxConstants;
+
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.end2end.Shadower;
import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -42,19 +38,43 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import co.cask.tephra.TxConstants;
-
import com.google.common.collect.Maps;
public class TransactionIT extends BaseHBaseManagedTimeIT {
private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
- @Before
+ @Before
public void setUp() throws SQLException {
- ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
+ Connection conn = DriverManager.getConnection(getUrl());
+ try {
+ conn.createStatement().execute(
+ "create table "+ FULL_TABLE_NAME + "("
+ + " varchar_pk VARCHAR NOT NULL, "
+ + " char_pk CHAR(6) NOT NULL, "
+ + " int_pk INTEGER NOT NULL, "
+ + " long_pk BIGINT NOT NULL, "
+ + " decimal_pk DECIMAL(31, 10) NOT NULL, "
+ + " date_pk DATE NOT NULL, "
+ + " a.varchar_col1 VARCHAR, "
+ + " a.char_col1 CHAR(10), "
+ + " a.int_col1 INTEGER, "
+ + " a.long_col1 BIGINT, "
+ + " a.decimal_col1 DECIMAL(31, 10), "
+ + " a.date1 DATE, "
+ + " b.varchar_col2 VARCHAR, "
+ + " b.char_col2 CHAR(10), "
+ + " b.int_col2 INTEGER, "
+ + " b.long_col2 BIGINT, "
+ + " b.decimal_col2 DECIMAL(31, 10), "
+ + " b.date2 DATE "
+ + " CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk, date_pk)) "
+ + "TRANSACTIONAL=true");
+ } finally {
+ conn.close();
+ }
}
-
+
@BeforeClass
@Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
public static void doSetup() throws Exception {
@@ -122,18 +142,31 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
}
@Test
- public void testAutocommitQueryEmptyTable() throws Exception {
+ public void testAutoCommitQuerySingleTable() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
try {
conn.setAutoCommit(true);
// verify no rows returned
- ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM "+FULL_TABLE_NAME);
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME);
assertFalse(rs.next());
} finally {
conn.close();
}
}
+ @Test
+ public void testAutoCommitQueryMultiTables() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ try {
+ conn.setAutoCommit(true);
+ // verify no rows returned
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME + " a JOIN " + FULL_TABLE_NAME + " b ON (a.long_pk = b.int_pk)");
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
@Test
public void testColConflicts() throws Exception {
Connection conn1 = DriverManager.getConnection(getUrl());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index d726488..08ba1b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -26,6 +26,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
+import co.cask.tephra.Transaction;
+
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
@@ -78,6 +80,7 @@ public class StatementContext {
private TableRef currentTable;
private List<Pair<byte[], byte[]>> whereConditionColumns;
private TimeRange scanTimeRange = null;
+ private Transaction transaction;
private Map<SelectStatement, Object> subqueryResults;
@@ -285,4 +288,12 @@ public class StatementContext {
public void setSubqueryResult(SelectStatement select, Object result) {
subqueryResults.put(select, result);
}
+
+ public Transaction getTransaction() {
+ return transaction;
+ }
+
+ public void setTransaction(Transaction transaction) {
+ this.transaction = transaction;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index b41107b..d15ee7f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -146,7 +146,7 @@ public class TupleProjectionCompiler {
projectedColumns.add(column);
}
- return PTableImpl.makePTable(table.getTenantId(), table.getSchemaName(), table.getName(), PTableType.PROJECTED,
+ return PTableImpl.makePTable(table.getTenantId(), table.getSchemaName(), table.getTableName(), PTableType.PROJECTED,
table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
table.getBucketNum(), projectedColumns, table.getParentSchemaName(),
table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 67d289e..8e38ffc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -477,6 +477,7 @@ public class UpsertCompiler {
break;
}
+ final QueryPlan originalQueryPlan = queryPlanToBe;
RowProjector projectorToBe = null;
// Optimize only after all checks have been performed
if (valueNodes == null) {
@@ -674,6 +675,14 @@ public class UpsertCompiler {
@Override
public MutationState execute() throws SQLException {
+ // Repeated from PhoenixStatement.executeQuery which this call bypasses.
+ // Send mutations to hbase, so they are visible to subsequent reads.
+ // Use original plan for data table so that data and immutable indexes will be sent.
+ boolean isTransactional = connection.getMutationState().startTransaction(originalQueryPlan.getContext().getResolver().getTables().iterator());
+ if (isTransactional) {
+ // Use real query plan so that we have the right context object.
+ queryPlan.getContext().setTransaction(connection.getMutationState().getTransaction());
+ }
ResultIterator iterator = queryPlan.iterator();
if (parallelIteratorFactory == null) {
return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
index 63bf6a1..6cbb06d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -22,20 +22,13 @@ import java.sql.SQLException;
public class CommitException extends SQLException {
private static final long serialVersionUID = 1L;
private final MutationState uncommittedState;
- private final MutationState committedState;
- public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) {
+ public CommitException(Exception e, MutationState uncommittedState) {
super(e);
this.uncommittedState = uncommittedState;
- this.committedState = committedState;
}
public MutationState getUncommittedState() {
return uncommittedState;
}
-
- public MutationState getCommittedState() {
- return committedState;
- }
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index a0cf8d2..e2b6968 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -72,6 +72,7 @@ import org.cloudera.htrace.TraceScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -140,20 +141,6 @@ public class MutationState implements SQLCloseable {
throwIfTooBig();
}
- private MutationState(MutationState state, List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries) {
- this.maxSize = state.maxSize;
- this.connection = state.connection;
- this.sizeOffset = state.sizeOffset;
- this.tx = state.tx;
- this.txAwares = state.txAwares;
- this.txContext = state.txContext;
- for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) {
- numRows += entry.getValue().size();
- this.mutations.put(entry.getKey(), entry.getValue());
- }
- throwIfTooBig();
- }
-
private void addTxParticipant(TransactionAware txAware) throws SQLException {
if (txContext == null) {
txAwares.add(txAware);
@@ -168,7 +155,7 @@ public class MutationState implements SQLCloseable {
return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null;
}
- public void startTransaction() throws SQLException {
+ public boolean startTransaction() throws SQLException {
if (txContext == null) {
throw new SQLException("No transaction context"); // TODO: error code
}
@@ -177,10 +164,12 @@ public class MutationState implements SQLCloseable {
if (!txStarted) {
txContext.start();
txStarted = true;
+ return true;
}
} catch (TransactionFailureException e) {
throw new SQLException(e); // TODO: error code
}
+ return false;
}
private void throwIfTooBig() {
@@ -372,46 +361,50 @@ public class MutationState implements SQLCloseable {
* @return the server time to use for the upsert
* @throws SQLException if the table or any columns no longer exist
*/
- private long[] validate() throws SQLException {
+ private long[] validateAll() throws SQLException {
int i = 0;
- Long scn = connection.getSCN();
- MetaDataClient client = new MetaDataClient(connection);
long[] timeStamps = new long[this.mutations.size()];
for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) {
TableRef tableRef = entry.getKey();
- long serverTimeStamp = tableRef.getTimeStamp();
- PTable table = tableRef.getTable();
- // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
- // so no need to do it again here.
- if (!connection.getAutoCommit()) {
- MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
- long timestamp = result.getMutationTime();
- if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
- serverTimeStamp = timestamp;
- if (result.wasUpdated()) {
- // TODO: use bitset?
- table = result.getTable();
- PColumn[] columns = new PColumn[table.getColumns().size()];
- for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
- Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
- if (valueEntry != PRow.DELETE_MARKER) {
- for (PColumn column : valueEntry.keySet()) {
- columns[column.getPosition()] = column;
- }
+ timeStamps[i++] = validate(tableRef, entry.getValue());
+ }
+ return timeStamps;
+ }
+
+ private long validate(TableRef tableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> values) throws SQLException {
+ Long scn = connection.getSCN();
+ MetaDataClient client = new MetaDataClient(connection);
+ long serverTimeStamp = tableRef.getTimeStamp();
+ PTable table = tableRef.getTable();
+ // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
+ // so no need to do it again here.
+ if (!connection.getAutoCommit()) {
+ MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
+ long timestamp = result.getMutationTime();
+ if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
+ serverTimeStamp = timestamp;
+ if (result.wasUpdated()) {
+ // TODO: use bitset?
+ table = result.getTable();
+ PColumn[] columns = new PColumn[table.getColumns().size()];
+ for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : values.entrySet()) {
+ Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
+ if (valueEntry != PRow.DELETE_MARKER) {
+ for (PColumn column : valueEntry.keySet()) {
+ columns[column.getPosition()] = column;
}
}
- for (PColumn column : columns) {
- if (column != null) {
- table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
- }
+ }
+ for (PColumn column : columns) {
+ if (column != null) {
+ table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
}
- tableRef.setTable(table);
}
+ tableRef.setTable(table);
}
}
- timeStamps[i++] = scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn;
}
- return timeStamps;
+ return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn;
}
private static void logMutationSize(HTableInterface htable, List<Mutation> mutations, PhoenixConnection connection) {
@@ -429,25 +422,31 @@ public class MutationState implements SQLCloseable {
}
@SuppressWarnings("deprecation")
- public void send() throws SQLException {
+ private void send(Iterator<TableRef> tableRefs) throws SQLException {
int i = 0;
+ long[] serverTimeStamps = null;
byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
- long[] serverTimeStamps = validate();
- Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
- List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size());
+ // Validate up front if not transactional so that we
+ if (tableRefs == null) {
+ serverTimeStamps = validateAll();
+ tableRefs = mutations.keySet().iterator();
+ }
// add tracing for this operation
TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables");
Span span = trace.getSpan();
- while (iterator.hasNext()) {
- Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next();
- Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue();
- TableRef tableRef = entry.getKey();
+ while (tableRefs.hasNext()) {
+ TableRef tableRef = tableRefs.next();
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = mutations.get(tableRef);
+ if (valuesMap == null) {
+ continue;
+ }
PTable table = tableRef.getTable();
table.getIndexMaintainers(tempPtr, connection);
boolean hasIndexMaintainers = tempPtr.getLength() > 0;
boolean isDataTable = true;
- long serverTimestamp = serverTimeStamps[i++];
+ // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
+ long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false);
while (mutationsIterator.hasNext()) {
Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
@@ -518,7 +517,6 @@ public class MutationState implements SQLCloseable {
MUTATION_COMMIT_TIME.update(duration);
shouldRetry = false;
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + duration + " ms", connection));
- committedList.add(entry);
} catch (Exception e) {
SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
if (inferredE != null) {
@@ -541,7 +539,7 @@ public class MutationState implements SQLCloseable {
}
// Throw to client with both what was committed so far and what is left to be committed.
// That way, client can either undo what was done or try again with what was not done.
- sqlE = new CommitException(e, this, new MutationState(this, committedList));
+ sqlE = new CommitException(e, this);
} finally {
try {
if (cache != null) {
@@ -567,9 +565,9 @@ public class MutationState implements SQLCloseable {
isDataTable = false;
}
if (tableRef.getTable().getType() != PTableType.INDEX) {
- numRows -= entry.getValue().size();
+ numRows -= valuesMap.size();
}
- iterator.remove(); // Remove batches as we process them
+ valuesMap.remove(tableRef); // Remove batches as we process them
}
trace.close();
assert(numRows==0);
@@ -624,4 +622,30 @@ public class MutationState implements SQLCloseable {
}
}
}
+
+ /**
+ * Send mutations to hbase, so they are visible to subsequent reads,
+ * starting a transaction if transactional and one has not yet been started.
+ * @param tableRefs
+ * @return true if at least partially transactional and false otherwise.
+ * @throws SQLException
+ */
+ public boolean startTransaction(Iterator<TableRef> tableRefs) throws SQLException {
+ Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){
+ @Override
+ public boolean apply(TableRef tableRef) {
+ return tableRef.getTable().isTransactional();
+ }
+ });
+ if (filteredTableRefs.hasNext()) {
+ startTransaction();
+ send(filteredTableRefs);
+ return true;
+ }
+ return false;
+ }
+
+ public void send() throws SQLException {
+ send(null);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 9cece1c..ad95ef9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -87,9 +87,11 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
this.scan = scan;
PTable table = tableRef.getTable();
HTableInterface htable = context.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
- if (table.isTransactional()) {
+ Transaction tx;
+ if (table.isTransactional() && (tx=context.getTransaction()) != null) {
TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable);
- Transaction tx = context.getConnection().getMutationState().getTransaction();
+ // Use transaction cached on context as we may have started a new transaction already
+ // if auto commit is true.
txAware.startTx(tx);
htable = txAware;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index fb295d3..ef2a233 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -234,19 +234,17 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
final long startTime = System.currentTimeMillis();
try {
QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
- startTransaction(plan);
- plan = connection.getQueryServices().getOptimizer().optimize(
- PhoenixStatement.this, plan);
+ // Send mutations to hbase, so they are visible to subsequent reads.
+ // Use original plan for data table so that data and immutable indexes will be sent
+ boolean isTransactional = connection.getMutationState().startTransaction(plan.getContext().getResolver().getTables().iterator());
+ plan = connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, plan);
+ if (isTransactional) {
+ // After optimize so that we have the right context object
+ plan.getContext().setTransaction(connection.getMutationState().getTransaction());
+ }
// this will create its own trace internally, so we don't wrap this
// whole thing in tracing
ResultIterator resultIterator = plan.iterator();
- if (connection.getAutoCommit()) {
- connection.commit(); // Forces new read point for next statement
- }
- else {
- // send mutations to hbase, so they are visible to subsequent reads
- connection.getMutationState().send();
- }
if (logger.isDebugEnabled()) {
String explainPlan = QueryUtil.getExplainPlan(resultIterator);
logger.debug(LogUtil.addCustomAnnotations("Explain plan: " + explainPlan, connection));
@@ -257,6 +255,10 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
setLastResultSet(rs);
setLastUpdateCount(NO_UPDATE);
setLastUpdateOperation(stmt.getOperation());
+ // If transactional, this will move the read pointer forward
+ if (connection.getAutoCommit()) {
+ connection.commit();
+ }
return rs;
} catch (RuntimeException e) {
// FIXME: Expression.evaluate does not throw SQLException
@@ -279,13 +281,14 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
}
}
- public void startTransaction(StatementPlan plan) throws SQLException {
+ private boolean startTransaction(StatementPlan plan) throws SQLException {
for (TableRef ref : plan.getContext().getResolver().getTables()) {
if (ref.getTable().isTransactional()) {
connection.getMutationState().startTransaction();
- break;
+ return true;
}
}
+ return false;
}
protected int executeMutation(final CompilableStatement stmt) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 01cf8b2..1655a57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2069,7 +2069,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public MutationState updateData(MutationPlan plan) throws SQLException {
- plan.getContext().getStatement().startTransaction(plan);
return plan.execute();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
index bd88770..b5351bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
@@ -97,10 +97,7 @@ public class TableRef {
@Override
public int hashCode() {
- final int prime = 31;
- int result = alias == null ? 0 : alias.hashCode();
- result = prime * result + this.table.getName().getString().hashCode();
- return result;
+ return this.table.getName().getString().hashCode();
}
@Override
@@ -109,7 +106,8 @@ public class TableRef {
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
TableRef other = (TableRef)obj;
- if ((alias == null && other.alias != null) || (alias != null && !alias.equals(other.alias))) return false;
+ // If alias is null, it matches any other alias
+ if (alias != null && other.alias != null && !alias.equals(other.alias)) return false;
if (!table.getName().getString().equals(other.table.getName().getString())) return false;
return true;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 03f7d0f..4ab438b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -81,7 +81,6 @@ import static org.apache.phoenix.util.TestUtil.STABLE_NAME;
import static org.apache.phoenix.util.TestUtil.TABLE_WITH_ARRAY;
import static org.apache.phoenix.util.TestUtil.TABLE_WITH_SALTING;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -418,7 +417,6 @@ public abstract class BaseTest {
" kv bigint)\n");
builder.put(INDEX_DATA_TABLE, "create table " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE + TEST_TABLE_SCHEMA + "IMMUTABLE_ROWS=true");
builder.put(MUTABLE_INDEX_DATA_TABLE, "create table " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + MUTABLE_INDEX_DATA_TABLE + TEST_TABLE_SCHEMA);
- builder.put(TRANSACTIONAL_DATA_TABLE, "create table " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE + TEST_TABLE_SCHEMA + "TRANSACTIONAL=true");
builder.put("SumDoubleTest","create table SumDoubleTest" +
" (id varchar not null primary key, d DOUBLE, f FLOAT, ud UNSIGNED_DOUBLE, uf UNSIGNED_FLOAT, i integer, de decimal)");
builder.put(JOIN_ORDER_TABLE_FULL_NAME, "create table " + JOIN_ORDER_TABLE_FULL_NAME +