You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/20 08:12:56 UTC

phoenix git commit: Fix TupleProjector to call getTableName instead of getName, make equality of TableRef ignore alias if either is null, cache Transaction on context for querying

Repository: phoenix
Updated Branches:
  refs/heads/txn 433495482 -> 714284d32


Fix TupleProjector to call getTableName instead of getName, make equality of TableRef ignore alias if either is null, cache Transaction on context for querying


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/714284d3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/714284d3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/714284d3

Branch: refs/heads/txn
Commit: 714284d32a4937015e97e6efdb0556b5990313f2
Parents: 4334954
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Apr 19 23:12:50 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Apr 19 23:12:50 2015 -0700

----------------------------------------------------------------------
 .../phoenix/transactions/TransactionIT.java     |  59 ++++++--
 .../phoenix/compile/StatementContext.java       |  11 ++
 .../compile/TupleProjectionCompiler.java        |   2 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |   9 ++
 .../apache/phoenix/execute/CommitException.java |   9 +-
 .../apache/phoenix/execute/MutationState.java   | 138 +++++++++++--------
 .../phoenix/iterate/TableResultIterator.java    |   6 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  27 ++--
 .../query/ConnectionQueryServicesImpl.java      |   1 -
 .../org/apache/phoenix/schema/TableRef.java     |   8 +-
 .../java/org/apache/phoenix/query/BaseTest.java |   2 -
 11 files changed, 171 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
index 807234d..31adcb9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
@@ -25,15 +25,11 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
+import co.cask.tephra.TxConstants;
+
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -42,19 +38,43 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import co.cask.tephra.TxConstants;
-
 import com.google.common.collect.Maps;
 
 public class TransactionIT extends BaseHBaseManagedTimeIT {
 	
 	private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
 	
-	@Before
+    @Before
     public void setUp() throws SQLException {
-		ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute(
+                  "create table "+ FULL_TABLE_NAME + "("
+                + "   varchar_pk VARCHAR NOT NULL, "
+                + "   char_pk CHAR(6) NOT NULL, "
+                + "   int_pk INTEGER NOT NULL, "
+                + "   long_pk BIGINT NOT NULL, "
+                + "   decimal_pk DECIMAL(31, 10) NOT NULL, "
+                + "   date_pk DATE NOT NULL, "
+                + "   a.varchar_col1 VARCHAR, "
+                + "   a.char_col1 CHAR(10), "
+                + "   a.int_col1 INTEGER, "
+                + "   a.long_col1 BIGINT, "
+                + "   a.decimal_col1 DECIMAL(31, 10), "
+                + "   a.date1 DATE, "
+                + "   b.varchar_col2 VARCHAR, "
+                + "   b.char_col2 CHAR(10), "
+                + "   b.int_col2 INTEGER, "
+                + "   b.long_col2 BIGINT, "
+                + "   b.decimal_col2 DECIMAL(31, 10), "
+                + "   b.date2 DATE "
+                + "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk, date_pk)) "
+                + "TRANSACTIONAL=true");
+        } finally {
+            conn.close();
+        }
     }
-	
+
 	@BeforeClass
     @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
     public static void doSetup() throws Exception {
@@ -122,18 +142,31 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
 	}
 	
 	@Test
-	public void testAutocommitQueryEmptyTable() throws Exception {
+	public void testAutoCommitQuerySingleTable() throws Exception {
 		Connection conn = DriverManager.getConnection(getUrl());
 		try {
 			conn.setAutoCommit(true);
 			// verify no rows returned
-			ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM "+FULL_TABLE_NAME);
+			ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME);
 			assertFalse(rs.next());
 		} finally {
 			conn.close();
 		}
 	}
 	
+    @Test
+    public void testAutoCommitQueryMultiTables() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.setAutoCommit(true);
+            // verify no rows returned
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME + " a JOIN " + FULL_TABLE_NAME + " b ON (a.long_pk = b.int_pk)");
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
 	@Test
 	public void testColConflicts() throws Exception {
 		Connection conn1 = DriverManager.getConnection(getUrl());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index d726488..08ba1b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 
+import co.cask.tephra.Transaction;
+
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
@@ -78,6 +80,7 @@ public class StatementContext {
     private TableRef currentTable;
     private List<Pair<byte[], byte[]>> whereConditionColumns;
     private TimeRange scanTimeRange = null;
+    private Transaction transaction;
 
     private Map<SelectStatement, Object> subqueryResults;
 
@@ -285,4 +288,12 @@ public class StatementContext {
     public void setSubqueryResult(SelectStatement select, Object result) {
         subqueryResults.put(select, result);
     }
+
+    public Transaction getTransaction() {
+        return transaction;
+    }
+
+    public void setTransaction(Transaction transaction) {
+        this.transaction = transaction;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index b41107b..d15ee7f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -146,7 +146,7 @@ public class TupleProjectionCompiler {
             projectedColumns.add(column);
         }
         
-        return PTableImpl.makePTable(table.getTenantId(), table.getSchemaName(), table.getName(), PTableType.PROJECTED,
+        return PTableImpl.makePTable(table.getTenantId(), table.getSchemaName(), table.getTableName(), PTableType.PROJECTED,
                 table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
                 table.getBucketNum(), projectedColumns, table.getParentSchemaName(),
                 table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 67d289e..8e38ffc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -477,6 +477,7 @@ public class UpsertCompiler {
             break;
         }
         
+        final QueryPlan originalQueryPlan = queryPlanToBe;
         RowProjector projectorToBe = null;
         // Optimize only after all checks have been performed
         if (valueNodes == null) {
@@ -674,6 +675,14 @@ public class UpsertCompiler {
 
                 @Override
                 public MutationState execute() throws SQLException {
+                    // Repeated from PhoenixStatement.executeQuery which this call bypasses.
+                    // Send mutations to hbase, so they are visible to subsequent reads.
+                    // Use original plan for data table so that data and immutable indexes will be sent.
+                    boolean isTransactional = connection.getMutationState().startTransaction(originalQueryPlan.getContext().getResolver().getTables().iterator());
+                    if (isTransactional) {
+                        // Use real query plan  so that we have the right context object.
+                        queryPlan.getContext().setTransaction(connection.getMutationState().getTransaction());
+                    }
                     ResultIterator iterator = queryPlan.iterator();
                     if (parallelIteratorFactory == null) {
                         return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
index 63bf6a1..6cbb06d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -22,20 +22,13 @@ import java.sql.SQLException;
 public class CommitException extends SQLException {
     private static final long serialVersionUID = 1L;
     private final MutationState uncommittedState;
-    private final MutationState committedState;
 
-    public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) {
+    public CommitException(Exception e, MutationState uncommittedState) {
         super(e);
         this.uncommittedState = uncommittedState;
-        this.committedState = committedState;
     }
 
     public MutationState getUncommittedState() {
         return uncommittedState;
     }
-
-    public MutationState getCommittedState() {
-        return committedState;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index a0cf8d2..e2b6968 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -72,6 +72,7 @@ import org.cloudera.htrace.TraceScope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -140,20 +141,6 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
     
-    private MutationState(MutationState state, List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries) {
-        this.maxSize = state.maxSize;
-        this.connection = state.connection;
-        this.sizeOffset = state.sizeOffset;
-        this.tx = state.tx;
-        this.txAwares = state.txAwares;
-        this.txContext = state.txContext;
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) {
-            numRows += entry.getValue().size();
-            this.mutations.put(entry.getKey(), entry.getValue());
-        }
-        throwIfTooBig();
-    }
-    
     private void addTxParticipant(TransactionAware txAware) throws SQLException {
         if (txContext == null) {
             txAwares.add(txAware);
@@ -168,7 +155,7 @@ public class MutationState implements SQLCloseable {
         return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null;
     }
     
-    public void startTransaction() throws SQLException {
+    public boolean startTransaction() throws SQLException {
         if (txContext == null) {
             throw new SQLException("No transaction context"); // TODO: error code
         }
@@ -177,10 +164,12 @@ public class MutationState implements SQLCloseable {
             if (!txStarted) {
                 txContext.start();
                 txStarted = true;
+                return true;
             }
         } catch (TransactionFailureException e) {
             throw new SQLException(e); // TODO: error code
         }
+        return false;
     }
     
     private void throwIfTooBig() {
@@ -372,46 +361,50 @@ public class MutationState implements SQLCloseable {
      * @return the server time to use for the upsert
      * @throws SQLException if the table or any columns no longer exist
      */
-    private long[] validate() throws SQLException {
+    private long[] validateAll() throws SQLException {
         int i = 0;
-        Long scn = connection.getSCN();
-        MetaDataClient client = new MetaDataClient(connection);
         long[] timeStamps = new long[this.mutations.size()];
         for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) {
             TableRef tableRef = entry.getKey();
-            long serverTimeStamp = tableRef.getTimeStamp();
-            PTable table = tableRef.getTable();
-            // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
-            // so no need to do it again here.
-            if (!connection.getAutoCommit()) {
-                MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
-                long timestamp = result.getMutationTime();
-                if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
-                    serverTimeStamp = timestamp;
-                    if (result.wasUpdated()) {
-                        // TODO: use bitset?
-                        table = result.getTable();
-                        PColumn[] columns = new PColumn[table.getColumns().size()];
-                        for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
-                            Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
-                            if (valueEntry != PRow.DELETE_MARKER) {
-                                for (PColumn column : valueEntry.keySet()) {
-                                    columns[column.getPosition()] = column;
-                                }
+            timeStamps[i++] = validate(tableRef, entry.getValue());
+        }
+        return timeStamps;
+    }
+    
+    private long validate(TableRef tableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> values) throws SQLException {
+        Long scn = connection.getSCN();
+        MetaDataClient client = new MetaDataClient(connection);
+        long serverTimeStamp = tableRef.getTimeStamp();
+        PTable table = tableRef.getTable();
+        // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
+        // so no need to do it again here.
+        if (!connection.getAutoCommit()) {
+            MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
+            long timestamp = result.getMutationTime();
+            if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
+                serverTimeStamp = timestamp;
+                if (result.wasUpdated()) {
+                    // TODO: use bitset?
+                    table = result.getTable();
+                    PColumn[] columns = new PColumn[table.getColumns().size()];
+                    for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : values.entrySet()) {
+                        Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
+                        if (valueEntry != PRow.DELETE_MARKER) {
+                            for (PColumn column : valueEntry.keySet()) {
+                                columns[column.getPosition()] = column;
                             }
                         }
-                        for (PColumn column : columns) {
-                            if (column != null) {
-                                table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
-                            }
+                    }
+                    for (PColumn column : columns) {
+                        if (column != null) {
+                            table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
                         }
-                        tableRef.setTable(table);
                     }
+                    tableRef.setTable(table);
                 }
             }
-            timeStamps[i++] = scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn;
         }
-        return timeStamps;
+        return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn;
     }
     
     private static void logMutationSize(HTableInterface htable, List<Mutation> mutations, PhoenixConnection connection) {
@@ -429,25 +422,31 @@ public class MutationState implements SQLCloseable {
     }
     
     @SuppressWarnings("deprecation")
-    public void send() throws SQLException {
+    private void send(Iterator<TableRef> tableRefs) throws SQLException {
         int i = 0;
+        long[] serverTimeStamps = null;
         byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
-        long[] serverTimeStamps = validate();
-        Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
-        List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size());
+        // Validate up front if not transactional so that we 
+        if (tableRefs == null) {
+            serverTimeStamps = validateAll();
+            tableRefs = mutations.keySet().iterator();
+        }
 
         // add tracing for this operation
         TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables");
         Span span = trace.getSpan();
-        while (iterator.hasNext()) {
-            Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next();
-            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue();
-            TableRef tableRef = entry.getKey();
+        while (tableRefs.hasNext()) {
+            TableRef tableRef = tableRefs.next();
+            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = mutations.get(tableRef);
+            if (valuesMap == null) {
+                continue;
+            }
             PTable table = tableRef.getTable();
             table.getIndexMaintainers(tempPtr, connection);
             boolean hasIndexMaintainers = tempPtr.getLength() > 0;
             boolean isDataTable = true;
-            long serverTimestamp = serverTimeStamps[i++];
+            // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
+            long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
             Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false);
             while (mutationsIterator.hasNext()) {
                 Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
@@ -518,7 +517,6 @@ public class MutationState implements SQLCloseable {
                         MUTATION_COMMIT_TIME.update(duration);
                         shouldRetry = false;
                         if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of  " + mutations.size() + " mutations into " + table.getName().getString() + ": " + duration + " ms", connection));
-                        committedList.add(entry);
                     } catch (Exception e) {
                         SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
                         if (inferredE != null) {
@@ -541,7 +539,7 @@ public class MutationState implements SQLCloseable {
                         }
                         // Throw to client with both what was committed so far and what is left to be committed.
                         // That way, client can either undo what was done or try again with what was not done.
-                        sqlE = new CommitException(e, this, new MutationState(this, committedList));
+                        sqlE = new CommitException(e, this);
                     } finally {
                         try {
                             if (cache != null) {
@@ -567,9 +565,9 @@ public class MutationState implements SQLCloseable {
                 isDataTable = false;
             }
             if (tableRef.getTable().getType() != PTableType.INDEX) {
-                numRows -= entry.getValue().size();
+                numRows -= valuesMap.size();
             }
-            iterator.remove(); // Remove batches as we process them
+            valuesMap.remove(tableRef); // Remove batches as we process them
         }
         trace.close();
         assert(numRows==0);
@@ -624,4 +622,30 @@ public class MutationState implements SQLCloseable {
             }
         }
     }
+
+    /**
+     * Send mutations to hbase, so they are visible to subsequent reads,
+     * starting a transaction if transactional and one has not yet been started.
+     * @param tableRefs
+     * @return true if at least partially transactional and false otherwise.
+     * @throws SQLException
+     */
+    public boolean startTransaction(Iterator<TableRef> tableRefs) throws SQLException {
+        Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){
+            @Override
+            public boolean apply(TableRef tableRef) {
+                return tableRef.getTable().isTransactional();
+            }
+        });
+        if (filteredTableRefs.hasNext()) {
+            startTransaction();
+            send(filteredTableRefs);
+            return true;
+        }
+        return false;
+    }
+        
+    public void send() throws SQLException {
+        send(null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 9cece1c..ad95ef9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -87,9 +87,11 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
         this.scan = scan;
         PTable table = tableRef.getTable();
         HTableInterface htable = context.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
-        if (table.isTransactional()) {
+        Transaction tx;
+        if (table.isTransactional() && (tx=context.getTransaction()) != null) {
             TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable);
-            Transaction tx = context.getConnection().getMutationState().getTransaction(); 
+            // Use transaction cached on context as we may have started a new transaction already
+            // if auto commit is true.
             txAware.startTx(tx);
             htable = txAware;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index fb295d3..ef2a233 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -234,19 +234,17 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                     final long startTime = System.currentTimeMillis();
                     try {
                         QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
-                        startTransaction(plan);
-                        plan = connection.getQueryServices().getOptimizer().optimize(
-                                PhoenixStatement.this, plan);
+                        // Send mutations to hbase, so they are visible to subsequent reads.
+                        // Use original plan for data table so that data and immutable indexes will be sent
+                        boolean isTransactional = connection.getMutationState().startTransaction(plan.getContext().getResolver().getTables().iterator());
+                        plan = connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, plan);
+                        if (isTransactional) {
+                            // After optimize so that we have the right context object
+                            plan.getContext().setTransaction(connection.getMutationState().getTransaction());
+                        }
                          // this will create its own trace internally, so we don't wrap this
                          // whole thing in tracing
                         ResultIterator resultIterator = plan.iterator();
-                        if (connection.getAutoCommit()) {
-                            connection.commit(); // Forces new read point for next statement
-                        }
-                        else {
-                        	// send mutations to hbase, so they are visible to subsequent reads
-                        	connection.getMutationState().send();
-                        }
                         if (logger.isDebugEnabled()) {
                             String explainPlan = QueryUtil.getExplainPlan(resultIterator);
                             logger.debug(LogUtil.addCustomAnnotations("Explain plan: " + explainPlan, connection));
@@ -257,6 +255,10 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                         setLastResultSet(rs);
                         setLastUpdateCount(NO_UPDATE);
                         setLastUpdateOperation(stmt.getOperation());
+                        // If transactional, this will move the read pointer forward
+                        if (connection.getAutoCommit()) {
+                            connection.commit();
+                        }
                         return rs;
                     } catch (RuntimeException e) {
                         // FIXME: Expression.evaluate does not throw SQLException
@@ -279,13 +281,14 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
         }
     }
     
-    public void startTransaction(StatementPlan plan) throws SQLException {
+    private boolean startTransaction(StatementPlan plan) throws SQLException {
         for (TableRef ref : plan.getContext().getResolver().getTables()) {
             if (ref.getTable().isTransactional()) {
                 connection.getMutationState().startTransaction();
-                break;
+                return true;
             }
         }
+        return false;
     }
     
     protected int executeMutation(final CompilableStatement stmt) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 01cf8b2..1655a57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2069,7 +2069,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     
     @Override
     public MutationState updateData(MutationPlan plan) throws SQLException {
-        plan.getContext().getStatement().startTransaction(plan);
         return plan.execute();
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
index bd88770..b5351bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
@@ -97,10 +97,7 @@ public class TableRef {
     
     @Override
     public int hashCode() {
-        final int prime = 31;
-        int result = alias == null ? 0 : alias.hashCode();
-        result = prime * result + this.table.getName().getString().hashCode();
-        return result;
+        return this.table.getName().getString().hashCode();
     }
 
     @Override
@@ -109,7 +106,8 @@ public class TableRef {
         if (obj == null) return false;
         if (getClass() != obj.getClass()) return false;
         TableRef other = (TableRef)obj;
-        if ((alias == null && other.alias != null) || (alias != null && !alias.equals(other.alias))) return false;
+        // If alias is null, it matches any other alias
+        if (alias != null && other.alias != null && !alias.equals(other.alias)) return false;
         if (!table.getName().getString().equals(other.table.getName().getString())) return false;
         return true;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 03f7d0f..4ab438b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -81,7 +81,6 @@ import static org.apache.phoenix.util.TestUtil.STABLE_NAME;
 import static org.apache.phoenix.util.TestUtil.TABLE_WITH_ARRAY;
 import static org.apache.phoenix.util.TestUtil.TABLE_WITH_SALTING;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -418,7 +417,6 @@ public abstract class BaseTest {
                 "    kv bigint)\n");
         builder.put(INDEX_DATA_TABLE, "create table " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE + TEST_TABLE_SCHEMA + "IMMUTABLE_ROWS=true");
         builder.put(MUTABLE_INDEX_DATA_TABLE, "create table " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + MUTABLE_INDEX_DATA_TABLE + TEST_TABLE_SCHEMA);
-        builder.put(TRANSACTIONAL_DATA_TABLE, "create table " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE + TEST_TABLE_SCHEMA + "TRANSACTIONAL=true");
         builder.put("SumDoubleTest","create table SumDoubleTest" +
                 "   (id varchar not null primary key, d DOUBLE, f FLOAT, ud UNSIGNED_DOUBLE, uf UNSIGNED_FLOAT, i integer, de decimal)");
         builder.put(JOIN_ORDER_TABLE_FULL_NAME, "create table " + JOIN_ORDER_TABLE_FULL_NAME +