You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2015/10/26 23:41:45 UTC

[1/4] phoenix git commit: Increase testing around transaction integration PHOENIX-1900

Repository: phoenix
Updated Branches:
  refs/heads/txn acacaf340 -> d81e660e3


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
index f94ce39..8a4f376 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
@@ -41,33 +41,23 @@ public class TxPointInTimeQueryIT extends BaseClientManagedTimeIT {
 	public void initTable() throws Exception {
 		ts = nextTimestamp();
 	}
-	
+
 	@Test
 	public void testQueryWithSCN() throws Exception {
 		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
 		props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
-		Connection conn = DriverManager.getConnection(getUrl(), props);
-		try {
-			conn.createStatement()
-					.execute(
-							"CREATE TABLE t (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true");
-
-			props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
-			conn = DriverManager.getConnection(getUrl(), props);
-
-			String selectQuery = "SELECT k FROM t";
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
 			try {
-				conn.createStatement().executeQuery(selectQuery);
+				conn.createStatement()
+						.execute(
+								"CREATE TABLE t (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true");
 				fail();
 			} catch (SQLException e) {
 				assertEquals("Unexpected Exception",
-						SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET.getErrorCode(),
-						e.getErrorCode());
+						SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET
+								.getErrorCode(), e.getErrorCode());
 			}
-
-		} finally {
-			conn.close();
 		}
 	}
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index db98e25..855915d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -347,7 +347,7 @@ public class FromCompiler {
             PTable theTable = null;
             if (updateCacheImmediately || connection.getAutoCommit()) {
                 MetaDataMutationResult result = client.updateCache(schemaName, tableName);
-                timeStamp = TransactionUtil.getTableTimestamp(connection, result);
+                timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
                 theTable = result.getTable();
                 if (theTable == null) {
                     throw new TableNotFoundException(schemaName, tableName, timeStamp);
@@ -367,7 +367,7 @@ public class FromCompiler {
                 if (theTable == null) {
                     MetaDataMutationResult result = client.updateCache(schemaName, tableName);
                     if (result.wasUpdated()) {
-                    	timeStamp = TransactionUtil.getTableTimestamp(connection, result);
+                    	timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
                         theTable = result.getTable();
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5d24f7e..ee724fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -774,7 +774,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 // without making an additional query
                 PTable table =
                         loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
-                if (table != null) {
+                if (table != null && !isTableDeleted(table)) {
                     if (table.getTimeStamp() < clientTimeStamp) {
                         // If the table is older than the client time stamp and it's deleted,
                         // continue
@@ -803,7 +803,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 // on the system table. This is an issue because of the way we manage batch mutation
                 // in the
                 // Indexer.
-                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
+				region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
 
                 // Invalidate the cache - the next getTable call will add it
                 // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index d89e19a..db50f83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -250,7 +250,7 @@ public enum SQLExceptionCode {
     DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1056, "43A13", "Default column family not allowed on VIEW or shared INDEX"),
     ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may be declared as transactional"),
     MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL(1071, "44A02", "An existing HBase table may not be mapped to as a transactional table"),
-	STORE_NULLS_MUST_BE_FALSE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional"),
+	STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional"),
     CANNOT_START_TRANSACTION_WITH_SCN_SET(1073, "44A04", "Cannot start a transaction on a connection with SCN set"),
 
     /** Sequence related */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 43fe28b..79da59f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import co.cask.tephra.Transaction;
@@ -196,13 +197,18 @@ public class MutationState implements SQLCloseable {
             }
             if (hasUncommittedData) {
                 try {
-                    tx = currentTx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
+                	if (txContext == null) {
+                		tx = currentTx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
+                	}  else {
+                		txContext.checkpoint();
+                		tx = currentTx = txContext.getCurrentTransaction();
+                	}
                     // Since we've checkpointed, we can clear out uncommitted set, since a statement run afterwards
                     // should see all this data.
                     uncommittedPhysicalNames.clear();
-                } catch (TransactionNotInProgressException e) {
+                } catch (TransactionFailureException | TransactionNotInProgressException e) {
                     throw new SQLException(e);
-                }
+				} 
             }
             // Since we're querying our own table while mutating it, we must exclude
             // see our current mutations, otherwise we can get erroneous results (for DELETE)
@@ -367,37 +373,14 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
     
-    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) {
+    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) {
         final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
                 (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? 
                         IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : 
                         Iterators.<PTable>emptyIterator();
-        final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
+        final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
-        Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator();
-        final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        while (iterator.hasNext()) {
-            Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next();
-            ImmutableBytesPtr key = rowEntry.getKey();
-            PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key);
-            List<Mutation> rowMutations, rowMutationsPertainingToIndex;
-            if (rowEntry.getValue() == PRow.DELETE_MARKER) { // means delete
-                row.delete();
-                rowMutations = row.toRowMutations();
-                // Row deletes for index tables are processed by running a re-written query
-                // against the index table (as this allows for flexibility in being able to
-                // delete rows).
-                rowMutationsPertainingToIndex = Collections.emptyList();
-            } else {
-                for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) {
-                    row.setValue(valueEntry.getKey(), valueEntry.getValue());
-                }
-                rowMutations = row.toRowMutations();
-                rowMutationsPertainingToIndex = rowMutations;
-            }
-            mutations.addAll(rowMutations);
-            if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex);
-        }
+        generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex);
         return new Iterator<Pair<byte[],List<Mutation>>>() {
             boolean isFirst = true;
 
@@ -410,14 +393,24 @@ public class MutationState implements SQLCloseable {
             public Pair<byte[], List<Mutation>> next() {
                 if (isFirst) {
                     isFirst = false;
-                    return new Pair<byte[],List<Mutation>>(tableRef.getTable().getPhysicalName().getBytes(),mutations);
+                    return new Pair<byte[],List<Mutation>>(tableRef.getTable().getPhysicalName().getBytes(),mutationList);
                 }
                 PTable index = indexes.next();
                 List<Mutation> indexMutations;
                 try {
                     indexMutations =
                             IndexUtil.generateIndexData(tableRef.getTable(), index, mutationsPertainingToIndex,
-                                    ptr, connection.getKeyValueBuilder(), connection);
+                                    connection.getKeyValueBuilder(), connection);
+                    // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
+                    if (!sendAll) {
+	                    TableRef key = new TableRef(index);
+						Map<ImmutableBytesPtr, Map<PColumn, byte[]>> rowToColumnMap = mutations.remove(key);
+	                    if (rowToColumnMap!=null) {
+		                    final List<Mutation> deleteMutations = Lists.newArrayList();
+		                    generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations, null);
+		                    indexMutations.addAll(deleteMutations);
+	                    }
+                    }
                 } catch (SQLException e) {
                     throw new IllegalDataException(e);
                 }
@@ -431,6 +424,35 @@ public class MutationState implements SQLCloseable {
             
         };
     }
+
+	private void generateMutations(final TableRef tableRef, long timestamp,
+			final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values,
+			final List<Mutation> mutationList,
+			final List<Mutation> mutationsPertainingToIndex) {
+		Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next();
+            ImmutableBytesPtr key = rowEntry.getKey();
+			PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key);
+			List<Mutation> rowMutations, rowMutationsPertainingToIndex;
+			if (rowEntry.getValue() == PRow.DELETE_MARKER) { // means delete
+			    row.delete();
+			    rowMutations = row.toRowMutations();
+			    // Row deletes for index tables are processed by running a re-written query
+			    // against the index table (as this allows for flexibility in being able to
+			    // delete rows).
+			    rowMutationsPertainingToIndex = Collections.emptyList();
+			} else {
+			    for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) {
+			        row.setValue(valueEntry.getKey(), valueEntry.getValue());
+			    }
+			    rowMutations = row.toRowMutations();
+			    rowMutationsPertainingToIndex = rowMutations;
+			}
+			mutationList.addAll(rowMutations);
+			if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex);
+        }
+	}
     
     /**
      * Get the unsorted list of HBase mutations for the tables with uncommitted data.
@@ -453,7 +475,7 @@ public class MutationState implements SQLCloseable {
             private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
                     
             private Iterator<Pair<byte[],List<Mutation>>> init() {
-                return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes);
+                return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes, true);
             }
             
             @Override
@@ -651,7 +673,7 @@ public class MutationState implements SQLCloseable {
             boolean isDataTable = true;
             // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
             long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
-            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false);
+            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
             while (mutationsIterator.hasNext()) {
                 Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
                 byte[] htableName = pair.getFirst();
@@ -855,13 +877,17 @@ public class MutationState implements SQLCloseable {
     }
     
     public void commit() throws SQLException {
+    	boolean sendMutationsFailed=false;
         try {
             send();
+        } catch (Throwable t) {
+        	sendMutationsFailed=true;
+        	throw t;
         } finally {
             txAwares.clear();
             if (txContext != null) {
                 try {
-                    if (txStarted) {
+                    if (txStarted && !sendMutationsFailed) {
                         txContext.finish();
                     }
                 } catch (TransactionFailureException e) {
@@ -872,8 +898,10 @@ public class MutationState implements SQLCloseable {
                         throw TransactionUtil.getSQLException(e);
                     }
                 } finally {
-                	reset();
-                }
+                  	if (!sendMutationsFailed) {
+                  		reset();
+                  	}
+                  }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index b0f87cb..bd0461d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -34,6 +34,7 @@ import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Delete;
@@ -83,6 +84,8 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
+import co.cask.tephra.TxConstants;
+
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -822,12 +825,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         int nDeleteCF = 0;
         int nDeleteVersionCF = 0;
         for (Cell kv : pendingUpdates) {
-        	if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
-        	    nDeleteCF++;
-        	}
-        	else if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
+        	if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
                 nDeleteVersionCF++;
             }
+        	else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()
+        			// Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor
+        			// TODO see if implement PhoenixTransactionalIndexer.preDelete will work instead of the following check
+        			|| (CellUtil.matchingQualifier(kv, TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
+        	    nDeleteCF++;
+        	}
         }
         // This is what a delete looks like on the server side for mutable indexing...
         // Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not
@@ -850,7 +856,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         	Cell newValue = newState.get(ref);
         	if (newValue != null) { // Indexed column has potentially changed
         	    ImmutableBytesWritable oldValue = oldState.getLatestValue(ref);
-        		boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode());
+        		boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() || CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY));
         		//If the new column value has to be set as null and the older value is null too,
         		//then just skip to the next indexed column.
         		if (newValueSetAsNull && oldValue == null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 14f8a1f..2719119 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -86,13 +86,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         ptr.set(state.getCurrentRowKey());
         List<IndexUpdate> indexUpdates = Lists.newArrayList();
         for (IndexMaintainer maintainer : indexMaintainers) {
-            // Check both immutable and local, as for transactional tables, we use an index maintainer
+            // For transactional tables, we use an index maintainer
             // to aid in rollback if there's a KeyValue column in the index. The alternative would be
             // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
             // client side.
-            if (maintainer.isImmutableRows() && maintainer.isLocalIndex()) {
-                continue;
-            }
             Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
             ValueGetter valueGetter = statePair.getFirst();
             IndexUpdate indexUpdate = statePair.getSecond();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 3d5da43..9f03747 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -17,6 +17,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import co.cask.tephra.Transaction;
@@ -28,11 +29,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -137,12 +141,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
 
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
         PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
-        if (m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) == null) {
-        	// Unless we're aborting the transaction, we do not want to see our own transaction writes,
-        	// since index maintenance requires seeing the previously committed data in order to function
-        	// properly.
-        	indexMetaData.getTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
-        }
+        byte[] txRollbackAttribute = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
         // get the current span, or just use a null-span to avoid a bunch of if statements
         try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
@@ -152,7 +151,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             }
 
             // get the index updates for all elements in this batch
-            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp));
+            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
 
             current.addTimelineAnnotation("Built index updates, doing preStep");
             TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
@@ -168,10 +167,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         }
     }
 
-    private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator) throws IOException {
-        ResultScanner scanner = null;
+    private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException {
+        ResultScanner currentScanner = null;
+        ResultScanner previousScanner = null;
         TransactionAwareHTable txTable = null;
-        
         // Collect up all mutations in batch
         Map<ImmutableBytesPtr, MultiMutation> mutations =
                 new HashMap<ImmutableBytesPtr, MultiMutation>();
@@ -196,13 +195,11 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
         Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
         for (IndexMaintainer indexMaintainer : indexMaintainers) {
-            // Check both immutable and local, as for transactional tables, we use an index maintainer
+            // For transactional tables, we use an index maintainer
             // to aid in rollback if there's a KeyValue column in the index. The alternative would be
             // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
             // client side.
-            if (!indexMaintainer.isImmutableRows() || !indexMaintainer.isLocalIndex()) {
                 mutableColumns.addAll(indexMaintainer.getAllColumns());
-            }
         }
 
         Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
@@ -226,39 +223,23 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                 HTableInterface htable = env.getTable(tableName);
                 txTable = new TransactionAwareHTable(htable);
                 txTable.startTx(tx);
-                scanner = txTable.getScanner(scan);
-            }
-            ColumnReference emptyColRef = new ColumnReference(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
-            if (scanner != null) {
-                Result result;
-                while ((result = scanner.next()) != null) {
-                    Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
-                    byte[] attribValue = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
-                    TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
-                    Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
-                    for (IndexUpdate delete : deletes) {
-                        if (delete.isValid()) {
-                            delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue);
-                            indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
-                        }
-                    }
-                    state.applyMutation();
-                    Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
-                    for (IndexUpdate put : puts) {
-                        if (put.isValid()) {
-                            indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
-                        }
-                    }
+                currentScanner = txTable.getScanner(scan);
+                if (txRollbackAttribute!=null) {
+	                tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+	                previousScanner = txTable.getScanner(scan);
                 }
             }
+            // In case of rollback we have to do two scans, one with VisibilityLevel.SNAPSHOT to see the current state of the row 
+            // and another with VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT to see the previous state of the row
+            // so that we can rollback a previous delete + put 
+            processScanner(env, indexMetaData, txRollbackAttribute, previousScanner, mutations, tx, mutableColumns, indexUpdates, false);
+            processScanner(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates, true);
             for (Mutation m : mutations.values()) {
-                TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m);
-                state.applyMutation();
-                Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
-                for (IndexUpdate put : puts) {
-                    if (put.isValid()) {
-                        indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
-                    }
+            	long timestamp = getTimestamp(txRollbackAttribute, tx.getWritePointer(), m);
+            	TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), timestamp, m);
+            	// if we did not generate valid put, we might have to generate a delete
+                if (!generatePuts(indexMetaData, indexUpdates, state)) {
+                	generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
                 }
             }
         } finally {
@@ -268,6 +249,68 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         return indexUpdates;
     }
 
+	private void processScanner(RegionCoprocessorEnvironment env,
+			PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
+			ResultScanner scanner,
+			Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx,
+			Set<ColumnReference> mutableColumns,
+			Collection<Pair<Mutation, byte[]>> indexUpdates, boolean removeMutation) throws IOException {
+		if (scanner != null) {
+		    Result result;
+		    ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+		    while ((result = scanner.next()) != null) {
+		        Mutation m = removeMutation ? mutations.remove(new ImmutableBytesPtr(result.getRow())) : mutations.get(new ImmutableBytesPtr(result.getRow()));
+		        long timestamp = getTimestamp(txRollbackAttribute, tx.getWritePointer(), m);
+		        TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), timestamp, m, emptyColRef, result);
+		        generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
+		        generatePuts(indexMetaData, indexUpdates, state);
+		    }
+		}
+	}
+
+	private long getTimestamp(byte[] txRollbackAttribute, long txnWritePointer, Mutation m) {
+		if (txRollbackAttribute==null) {
+			return txnWritePointer;
+		}
+		// if this is a rollback generate mutations with the same timestamp as the data row mutation as the timestamp might be 
+        // different from the current txn write pointer because of check points
+		long mutationTimestamp = txnWritePointer;
+		for (Entry<byte[], List<Cell>> entry : m.getFamilyCellMap().entrySet()) {
+			mutationTimestamp = entry.getValue().get(0).getTimestamp();
+			break;
+		}
+		return mutationTimestamp;
+	}
+
+	private void generateDeletes(PhoenixIndexMetaData indexMetaData,
+			Collection<Pair<Mutation, byte[]>> indexUpdates,
+			byte[] attribValue, TxTableState state) throws IOException {
+		Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
+		for (IndexUpdate delete : deletes) {
+		    if (delete.isValid()) {
+		        delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue);
+		        indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
+		    }
+		}
+	}
+
+	boolean generatePuts(
+			PhoenixIndexMetaData indexMetaData,
+			Collection<Pair<Mutation, byte[]>> indexUpdates,
+			TxTableState state)
+			throws IOException {
+		state.applyMutation();
+		Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
+		boolean validPut = false;
+		for (IndexUpdate put : puts) {
+		    if (put.isValid()) {
+		        indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
+		        validPut = true;
+		    }
+		}
+		return validPut;
+	}
+
 
     private static class TxTableState implements TableState {
         private final Mutation mutation;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index a073b84..35a8663 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -215,7 +215,8 @@ public class QueryOptimizer {
     
     private static QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean isHinted) throws SQLException {
         int nColumns = dataPlan.getProjector().getColumnCount();
-        String alias = '"' + dataPlan.getTableRef().getTableAlias() + '"'; // double quote in case it's case sensitive
+        String tableAlias = dataPlan.getTableRef().getTableAlias();
+		String alias = tableAlias==null ? null : '"' + tableAlias + '"'; // double quote in case it's case sensitive
         String schemaName = index.getParentSchemaName().getString();
         schemaName = schemaName.length() == 0 ? null :  '"' + schemaName + '"';
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 5667029..b2982e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1228,7 +1228,7 @@ public class MetaDataClient {
             Long timestamp = null;
             if (parent != null) {
                 transactional = parent.isTransactional();
-                timestamp = TransactionUtil.getTableTimestamp(connection, transactional, null);
+                timestamp = TransactionUtil.getTableTimestamp(connection, transactional);
                 storeNulls = parent.getStoreNulls();
                 if (tableType == PTableType.INDEX) {
                     // Index on view
@@ -1376,12 +1376,15 @@ public class MetaDataClient {
             if (transactional) { // FIXME: remove once Tephra handles storing multiple versions of a cell value, 
             	// and allows ignoring empty key values for an operation
             	if (Boolean.FALSE.equals(storeNullsProp)) {
-            		throw new SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_MUST_BE_FALSE_FOR_TRANSACTIONAL)
+            		throw new SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL)
             		.setSchemaName(schemaName).setTableName(tableName)
             		.build().buildException();
             	}
+            	// Force STORE_NULLS to true when transactional as Tephra cannot deal with column deletes
+            	storeNulls = true;
+            	tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.TRUE);
             }
-            timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactional, null) : timestamp;
+            timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactional) : timestamp;
 
             // Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views
             if (statement.getTableType() == PTableType.VIEW || indexId != null) {
@@ -1891,8 +1894,7 @@ public class MetaDataClient {
 
                 .setSchemaName(schemaName).setTableName(tableName).build().buildException();
             default:
-				connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName,
-                        TransactionUtil.getTableTimestamp(connection, transactional, result.getMutationTime()));
+				connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName, result.getMutationTime());
 
                 if (result.getTable() != null && tableType != PTableType.VIEW) {
                     connection.setAutoCommit(true);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index a32e922..b78a904 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -831,7 +831,7 @@ public class PTableImpl implements PTable {
 
     @Override
     public synchronized boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
-        if (indexMaintainersPtr == null) {
+        if (indexMaintainersPtr == null || indexMaintainersPtr.getLength()==0) {
             indexMaintainersPtr = new ImmutableBytesWritable();
             if (indexes.isEmpty()) {
                 indexMaintainersPtr.set(ByteUtil.EMPTY_BYTE_ARRAY);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 51428ea..62ec1f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -233,9 +233,10 @@ public class IndexUtil {
     }
     
     public static List<Mutation> generateIndexData(final PTable table, PTable index,
-            List<Mutation> dataMutations, ImmutableBytesWritable ptr, final KeyValueBuilder kvBuilder, PhoenixConnection connection)
+            List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection)
             throws SQLException {
         try {
+        	final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
             List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
             for (final Mutation dataMutation : dataMutations) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 0998e72..c8c468d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -66,7 +67,9 @@ public class ServerUtil {
         } else if (t instanceof IOException) {
             // If the IOException does not wrap any exception, then bubble it up.
             Throwable cause = t.getCause();
-            if (cause == null || cause instanceof IOException) {
+            if (cause instanceof RetriesExhaustedWithDetailsException)
+            	return new DoNotRetryIOException(t.getMessage(), cause);
+            else if (cause == null || cause instanceof IOException) {
                 return (IOException) t;
             }
             // Else assume it's been wrapped, so throw as DoNotRetryIOException to prevent client hanging while retrying

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 893a895..020ac3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -17,17 +17,8 @@
  */
 package org.apache.phoenix.util;
 
-import java.io.IOException;
 import java.sql.SQLException;
 
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCodec;
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase98.TransactionAwareHTable;
-
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -36,16 +27,21 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
 
+import co.cask.tephra.TransactionConflictException;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
 public class TransactionUtil {
     private TransactionUtil() {
     }
     
     public static long convertToNanoseconds(long serverTimeStamp) {
-        return serverTimeStamp * 1000000;
+        return serverTimeStamp * TxConstants.MAX_TX_PER_MS;
     }
     
-    public static long convertToMillisecods(Long serverTimeStamp) {
-        return serverTimeStamp / 1000000;
+    public static long convertToMilliseconds(long serverTimeStamp) {
+        return serverTimeStamp / TxConstants.MAX_TX_PER_MS;
     }
     
     public static SQLException getSQLException(TransactionFailureException e) {
@@ -67,10 +63,11 @@ public class TransactionUtil {
     	return new TransactionAwareHTable(htable, table.isImmutableRows() ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
     }
     
-	public static long getResolvedTimestamp(PhoenixConnection connection, boolean isTransactional, Long defaultResolvedTimestamp) {
+    // we resolve transactional tables at the txn read pointer
+	public static long getResolvedTimestamp(PhoenixConnection connection, boolean isTransactional, long defaultResolvedTimestamp) {
 		MutationState mutationState = connection.getMutationState();
 		Long scn = connection.getSCN();
-	    return scn != null ?  scn : (isTransactional && mutationState.isTransactionStarted()) ? convertToMillisecods(mutationState.getReadPointer()) : defaultResolvedTimestamp;
+	    return scn != null ?  scn : (isTransactional && mutationState.isTransactionStarted()) ? convertToMilliseconds(mutationState.getReadPointer()) : defaultResolvedTimestamp;
 	}
 
 	public static long getResolvedTime(PhoenixConnection connection, MetaDataMutationResult result) {
@@ -79,21 +76,30 @@ public class TransactionUtil {
 		return getResolvedTimestamp(connection, isTransactional, result.getMutationTime());
 	}
 
-	public static long getTableTimestamp(PhoenixConnection connection, MetaDataMutationResult result) {
+	public static long getResolvedTimestamp(PhoenixConnection connection, MetaDataMutationResult result) {
 		PTable table = result.getTable();
 		MutationState mutationState = connection.getMutationState();
 		boolean txInProgress = table != null && table.isTransactional() && mutationState.isTransactionStarted();
-		return  txInProgress ? convertToMillisecods(mutationState.getReadPointer()) : result.getMutationTime();
+		return  txInProgress ? convertToMilliseconds(mutationState.getReadPointer()) : result.getMutationTime();
 	}
 
-	public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional, Long mutationTime) throws SQLException {
-		Long timestamp = mutationTime;
+	public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional) throws SQLException {
+		Long timestamp = null;
+		if (!transactional) {
+			return timestamp;
+		}
 		MutationState mutationState = connection.getMutationState();
-		if (transactional && !mutationState.isTransactionStarted() && connection.getSCN()==null) {
+		// we need to burn a txn so that we are sure the txn read pointer is close to wall clock time
+		if (!mutationState.isTransactionStarted()) {
 			mutationState.startTransaction();
-			timestamp = convertToMillisecods(mutationState.getReadPointer());
 			connection.commit();
 		}
+		else {
+			connection.commit();
+		}
+		mutationState.startTransaction();
+		timestamp = convertToMilliseconds(mutationState.getReadPointer());
+		connection.commit();
 		return timestamp;
 	}
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index da965c8..9cbde9d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -134,6 +134,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.ConfigUtil;
+import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -178,7 +179,7 @@ import com.google.inject.util.Providers;
  *
  */
 public abstract class BaseTest {
-    private static final String TEST_TABLE_SCHEMA = "(" +
+    protected static final String TEST_TABLE_SCHEMA = "(" +
 	                "   varchar_pk VARCHAR NOT NULL, " +
 	                "   char_pk CHAR(6) NOT NULL, " +
 	                "   int_pk INTEGER NOT NULL, "+ 
@@ -468,9 +469,9 @@ public abstract class BaseTest {
         return conf.get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
     }
     
-    private static String url;
+    protected static String url;
     protected static PhoenixTestDriver driver;
-    private static boolean clusterInitialized = false;
+    protected static boolean clusterInitialized = false;
     private static HBaseTestingUtility utility;
     protected static final Configuration config = HBaseConfiguration.create(); 
     
@@ -495,7 +496,7 @@ public abstract class BaseTest {
         
     }
     
-    private static void setupTxManager() throws SQLException, IOException {
+    protected static void setupTxManager() throws SQLException, IOException {
         config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
         config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
         config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
@@ -1729,7 +1730,81 @@ public abstract class BaseTest {
         return utility;
     }
 
-    protected static void createMultiCFTestTable(String tableName) throws SQLException {
+    // Populate the test table with data.
+	public static void populateTestTable(String fullTableName) throws SQLException {
+	    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+	    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        String upsert = "UPSERT INTO " + fullTableName
+	                + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+	        PreparedStatement stmt = conn.prepareStatement(upsert);
+	        stmt.setString(1, "varchar1");
+	        stmt.setString(2, "char1");
+	        stmt.setInt(3, 1);
+	        stmt.setLong(4, 1L);
+	        stmt.setBigDecimal(5, new BigDecimal(1.0));
+	        Date date = DateUtil.parseDate("2015-01-01 00:00:00");
+	        stmt.setDate(6, date);
+	        stmt.setString(7, "varchar_a");
+	        stmt.setString(8, "chara");
+	        stmt.setInt(9, 2);
+	        stmt.setLong(10, 2L);
+	        stmt.setBigDecimal(11, new BigDecimal(2.0));
+	        stmt.setDate(12, date);
+	        stmt.setString(13, "varchar_b");
+	        stmt.setString(14, "charb");
+	        stmt.setInt(15, 3);
+	        stmt.setLong(16, 3L);
+	        stmt.setBigDecimal(17, new BigDecimal(3.0));
+	        stmt.setDate(18, date);
+	        stmt.executeUpdate();
+	        
+	        stmt.setString(1, "varchar2");
+	        stmt.setString(2, "char2");
+	        stmt.setInt(3, 2);
+	        stmt.setLong(4, 2L);
+	        stmt.setBigDecimal(5, new BigDecimal(2.0));
+	        date = DateUtil.parseDate("2015-01-02 00:00:00");
+	        stmt.setDate(6, date);
+	        stmt.setString(7, "varchar_a");
+	        stmt.setString(8, "chara");
+	        stmt.setInt(9, 3);
+	        stmt.setLong(10, 3L);
+	        stmt.setBigDecimal(11, new BigDecimal(3.0));
+	        stmt.setDate(12, date);
+	        stmt.setString(13, "varchar_b");
+	        stmt.setString(14, "charb");
+	        stmt.setInt(15, 4);
+	        stmt.setLong(16, 4L);
+	        stmt.setBigDecimal(17, new BigDecimal(4.0));
+	        stmt.setDate(18, date);
+	        stmt.executeUpdate();
+	        
+	        stmt.setString(1, "varchar3");
+	        stmt.setString(2, "char3");
+	        stmt.setInt(3, 3);
+	        stmt.setLong(4, 3L);
+	        stmt.setBigDecimal(5, new BigDecimal(3.0));
+	        date = DateUtil.parseDate("2015-01-03 00:00:00");
+	        stmt.setDate(6, date);
+	        stmt.setString(7, "varchar_a");
+	        stmt.setString(8, "chara");
+	        stmt.setInt(9, 4);
+	        stmt.setLong(10, 4L);
+	        stmt.setBigDecimal(11, new BigDecimal(4.0));
+	        stmt.setDate(12, date);
+	        stmt.setString(13, "varchar_b");
+	        stmt.setString(14, "charb");
+	        stmt.setInt(15, 5);
+	        stmt.setLong(16, 5L);
+	        stmt.setBigDecimal(17, new BigDecimal(5.0));
+	        stmt.setDate(18, date);
+	        stmt.executeUpdate();
+	        
+	        conn.commit();
+	    }
+	}
+
+	protected static void createMultiCFTestTable(String tableName, String options) throws SQLException {
         String ddl = "create table if not exists " + tableName + "(" +
                 "   varchar_pk VARCHAR NOT NULL, " +
                 "   char_pk CHAR(5) NOT NULL, " +
@@ -1747,7 +1822,8 @@ public abstract class BaseTest {
                 "   b.long_col2 BIGINT, " +
                 "   b.decimal_col2 DECIMAL, " +
                 "   b.date_col DATE " + 
-                "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk))";
+                "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk)) "
+                + (options!=null? options : "");
             Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
             Connection conn = DriverManager.getConnection(getUrl(), props);
             conn.createStatement().execute(ddl);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index afb6df3..1d10116 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
     <htrace.version>2.04</htrace.version>
     <collections.version>3.2.1</collections.version>
     <jodatime.version>2.3</jodatime.version>
-    <tephra.version>0.6.1</tephra.version>
+    <tephra.version>0.6.3-SNAPSHOT</tephra.version>
 
     <!-- Test Dependencies -->
     <mockito-all.version>1.8.5</mockito-all.version>


[3/4] phoenix git commit: Increase testing around transaction integration PHOENIX-1900

Posted by td...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
new file mode 100644
index 0000000..3738e5b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
@@ -0,0 +1,884 @@
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class IndexIT extends BaseHBaseManagedTimeIT {
+	
+	private final boolean localIndex;
+	private final String tableDDLOptions;
+	
+	public IndexIT(boolean localIndex, boolean mutable, boolean transactional) {
+		this.localIndex = localIndex;
+		StringBuilder optionBuilder = new StringBuilder();
+		if (!mutable) 
+			optionBuilder.append(" IMMUTABLE_ROWS=true ");
+		if (transactional) {
+			if (!(optionBuilder.length()==0))
+				optionBuilder.append(",");
+			optionBuilder.append(" TRANSACTIONAL=true ");
+		}
+		this.tableDDLOptions = optionBuilder.toString();
+	}
+	
+	@Parameters(name="localIndex = {0} , mutable = {1} , transactional = {2}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true }, 
+                 { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true }
+           });
+    }
+
+	@Test
+    public void testIndexWithNullableFixedWithCols() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+	        Statement stmt = conn.createStatement();
+	        stmt.execute(ddl);
+	        BaseTest.populateTestTable(fullTableName);
+	        ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName 
+	                    + " (char_col1 ASC, int_col1 ASC)"
+	                    + " INCLUDE (long_col1, long_col2)";
+	        stmt.execute(ddl);
+	        
+	        String query = "SELECT d.char_col1, int_col1 from " + fullTableName + " as d";
+	        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex) {
+	            assertEquals(
+	                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + tableName + " [-32768]\n" + 
+	                "    SERVER FILTER BY FIRST KEY ONLY\n" +
+	                "CLIENT MERGE SORT",
+	                QueryUtil.getExplainPlan(rs));
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + indexName + "\n"
+	                    + "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
+	        }
+	        
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("chara", rs.getString(1));
+	        assertEquals("chara", rs.getString("char_col1"));
+	        assertEquals(2, rs.getInt(2));
+	        assertTrue(rs.next());
+	        assertEquals("chara", rs.getString(1));
+	        assertEquals(3, rs.getInt(2));
+	        assertTrue(rs.next());
+	        assertEquals("chara", rs.getString(1));
+	        assertEquals(4, rs.getInt(2));
+	        assertFalse(rs.next());
+	        
+	        conn.createStatement().execute("DROP INDEX " + indexName + " ON " + fullTableName);
+	        
+	        query = "SELECT char_col1, int_col1 from " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        
+	        query = "SELECT char_col1, int_col1 from "+indexName;
+	        try{
+	            rs = conn.createStatement().executeQuery(query);
+	            fail();
+	        } catch (SQLException e) {
+	            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
+	        }
+        }
+    }
+    
+    @Test
+    public void testDeleteFromAllPKColumnIndex() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+	        Statement stmt = conn.createStatement();
+	        stmt.execute(ddl);
+	        BaseTest.populateTestTable(fullTableName);
+	        ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName
+	                    + " (long_pk, varchar_pk)"
+	                    + " INCLUDE (long_col1, long_col2)";
+	        stmt.execute(ddl);
+	        
+	        ResultSet rs;
+	        
+	        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName);
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getInt(1));
+	        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getInt(1));
+	        
+	        String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
+	        assertEquals(1,conn.createStatement().executeUpdate(dml));
+	        conn.commit();
+	        
+	        String query = "SELECT /*+ NO_INDEX */ long_pk FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals(1L, rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(3L, rs.getLong(1));
+	        assertFalse(rs.next());
+	        
+	        query = "SELECT long_pk FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals(1L, rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(3L, rs.getLong(1));
+	        assertFalse(rs.next());
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals(1L, rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(3L, rs.getLong(1));
+	        assertFalse(rs.next());
+	        
+	        conn.createStatement().execute("DROP INDEX " + indexName + " ON " + fullTableName);
+        }
+    }
+    
+    //TODO ENABLE THIS TEST AFTER MERGING MASTER TO SEE IF THE SCAN IS CREATED CORRECTLY
+//    @Test
+//    public void testDeleteFromNonPKColumnIndex() throws Exception {
+//        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+//        // create unique table and index names for each parameterized test
+//        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+//        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+//        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+//        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+//        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+//        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+//	        conn.setAutoCommit(false);
+//	        Statement stmt = conn.createStatement();
+//	        stmt.execute(ddl);
+//	        BaseTest.populateTestTable(fullTableName);
+//	        ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName
+//	                    + " (long_col1, long_col2)"
+//	                    + " INCLUDE (decimal_col1, decimal_col2)";
+//	        stmt.execute(ddl);
+//        }
+//        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {    
+//	        ResultSet rs;
+//	        
+//	        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName);
+//	        assertTrue(rs.next());
+//	        assertEquals(3,rs.getInt(1));
+//	        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+//	        assertTrue(rs.next());
+//	        assertEquals(3,rs.getInt(1));
+//	        
+//	        String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
+//	        assertEquals(1,conn.createStatement().executeUpdate(dml));
+//	        conn.commit();
+//
+//	        // query the data table
+//	        String query = "SELECT /*+ NO_INDEX */ long_pk FROM " + fullTableName;
+//	        rs = conn.createStatement().executeQuery(query);
+//	        assertTrue(rs.next());
+//	        assertEquals(1L, rs.getLong(1));
+//	        assertTrue(rs.next());
+//	        assertEquals(3L, rs.getLong(1));
+//	        assertFalse(rs.next());
+//	        
+//	        // query the index table
+//	        query = "SELECT long_pk FROM " + fullTableName + " ORDER BY long_col1";
+//	        rs = conn.createStatement().executeQuery(query);
+//	        assertTrue(rs.next());
+//	        assertEquals(1L, rs.getLong(1));
+//	        assertTrue(rs.next());
+//	        assertEquals(3L, rs.getLong(1));
+//	        assertFalse(rs.next());
+//	        
+//	        conn.createStatement().execute("DROP INDEX " + indexName + " ON " + fullTableName);
+//        }
+//    }
+    
+    @Test
+    public void testGroupByCount() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    	try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+	        Statement stmt = conn.createStatement();
+	        stmt.execute(ddl);
+	        BaseTest.populateTestTable(fullTableName);
+	        ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + " (int_col2)";
+	        stmt.execute(ddl);
+	        ResultSet rs;
+	        rs = conn.createStatement().executeQuery("SELECT int_col2, COUNT(*) FROM " + fullTableName + " GROUP BY int_col2");
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getInt(2));
+    	}
+    }
+
+    @Test
+    public void testSelectDistinctOnTableWithSecondaryImmutableIndex() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    	try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+	        Statement stmt = conn.createStatement();
+	        stmt.execute(ddl);
+	        BaseTest.populateTestTable(fullTableName);
+	        ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + " (int_col2)";
+            PreparedStatement pstmt = conn.prepareStatement(ddl);
+            pstmt.execute();
+            ResultSet rs = conn.createStatement().executeQuery("SELECT distinct int_col2 FROM " + fullTableName + " where int_col2 > 0");
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(4, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+            assertFalse(rs.next());
+    	}
+    }
+
+    @Test
+    public void testInClauseWithIndexOnColumnOfUsignedIntType() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    	try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+	        Statement stmt = conn.createStatement();
+	        stmt.execute(ddl);
+	        BaseTest.populateTestTable(fullTableName);
+	        ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + " (int_col1)";
+            stmt.execute(ddl);
+            ResultSet rs = conn.createStatement().executeQuery("SELECT int_col1 FROM " + fullTableName + " where int_col1 IN (1, 2, 3, 4)");
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(4, rs.getInt(1));
+            assertFalse(rs.next());
+    	}
+    }
+    
+    @Test
+    public void createIndexOnTableWithSpecifiedDefaultCF() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+            ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        String ddl ="CREATE TABLE " + fullTableName 
+	        		+ " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) DEFAULT_COLUMN_FAMILY='A'" + (!tableDDLOptions.isEmpty() ? "," + tableDDLOptions : "");
+	        Statement stmt = conn.createStatement();
+	        stmt.execute(ddl);
+
+	        query = "SELECT * FROM " + tableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+
+	        String options = localIndex ? "SALT_BUCKETS=10, MULTI_TENANT=true, IMMUTABLE_ROWS=true, DISABLE_WAL=true" : "";
+	        conn.createStatement().execute(
+	                "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2) " + options);
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	        
+	        //check options set correctly on index
+	        TableName indexTableName = TableName.create(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        NamedTableNode indexNode = NamedTableNode.create(null, indexTableName, null);
+	        ColumnResolver resolver = FromCompiler.getResolver(indexNode, conn.unwrap(PhoenixConnection.class));
+	        PTable indexTable = resolver.getTables().get(0).getTable();
+	        // Can't set IMMUTABLE_ROWS, MULTI_TENANT or DEFAULT_COLUMN_FAMILY_NAME on an index
+	        assertNull(indexTable.getDefaultFamilyName());
+	        assertFalse(indexTable.isMultiTenant());
+	        assertFalse(indexTable.isImmutableRows());
+	        if(localIndex) {
+	            assertEquals(10, indexTable.getBucketNum().intValue());
+	            assertTrue(indexTable.isWALDisabled());
+	        }
+        }
+    }
+    
+    @Test
+    public void testIndexWithNullableDateCol() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+            Date date = new Date(System.currentTimeMillis());
+            
+            createMultiCFTestTable(fullTableName, tableDDLOptions);
+            populateMultiCFTestTable(fullTableName, date);
+            String ddl = "CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (date_col)";
+            PreparedStatement stmt = conn.prepareStatement(ddl);
+            stmt.execute();
+            
+            String query = "SELECT int_pk from " + fullTableName ;
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if (localIndex) {
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName +" [-32768]\n"
+                           + "    SERVER FILTER BY FIRST KEY ONLY\n"
+                           + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n"
+                        + "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
+            }
+            
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            assertFalse(rs.next());
+            
+            query = "SELECT date_col from " + fullTableName + " order by date_col" ;
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if (localIndex) {
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName + " [-32768]\n"
+                        + "    SERVER FILTER BY FIRST KEY ONLY\n"
+                        + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n"
+                        + "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
+            }
+            
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(date, rs.getDate(1));
+            assertTrue(rs.next());
+            assertEquals(new Date(date.getTime() + TestUtil.MILLIS_IN_DAY), rs.getDate(1));
+            assertTrue(rs.next());
+            assertEquals(new Date(date.getTime() + 2 * TestUtil.MILLIS_IN_DAY), rs.getDate(1));
+            assertFalse(rs.next());
+        } 
+    }
+    
+    @Test
+    public void testSelectAllAndAliasWithIndex() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        
+	        String ddl = "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions;
+			conn.createStatement().execute(ddl);
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	        
+	        ddl = "CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v2 DESC) INCLUDE (v1)";
+	        conn.createStatement().execute(ddl);
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+
+	        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        stmt.setString(1,"b");
+	        stmt.setString(2, "y");
+	        stmt.setString(3, "2");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex){
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+	        }
+
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("b",rs.getString(1));
+	        assertEquals("y",rs.getString(2));
+	        assertEquals("2",rs.getString(3));
+	        assertEquals("b",rs.getString("k"));
+	        assertEquals("y",rs.getString("v1"));
+	        assertEquals("2",rs.getString("v2"));
+	        assertTrue(rs.next());
+	        assertEquals("a",rs.getString(1));
+	        assertEquals("x",rs.getString(2));
+	        assertEquals("1",rs.getString(3));
+	        assertEquals("a",rs.getString("k"));
+	        assertEquals("x",rs.getString("v1"));
+	        assertEquals("1",rs.getString("v2"));
+	        assertFalse(rs.next());
+	        
+	        query = "SELECT v1 as foo FROM " + fullTableName + " WHERE v2 = '1' ORDER BY foo";
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex){
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" +fullTableName + " [-32768,~'1']\n" + 
+	                    "    SERVER SORTED BY [\"V1\"]\n" + 
+	                    "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " +fullIndexName + " [~'1']\n" + 
+	                    "    SERVER SORTED BY [\"V1\"]\n" + 
+	                    "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+	        }
+
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x",rs.getString(1));
+	        assertEquals("x",rs.getString("foo"));
+	        assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testSelectCF() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        
+	        String ddl = "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, a.v1 VARCHAR, a.v2 VARCHAR, b.v1 VARCHAR) " + tableDDLOptions;
+			conn.createStatement().execute(ddl);
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	        ddl = "CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v2 DESC) INCLUDE (a.v1)";
+	        conn.createStatement().execute(ddl);
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.setString(4, "A");
+	        stmt.execute();
+	        stmt.setString(1,"b");
+	        stmt.setString(2, "y");
+	        stmt.setString(3, "2");
+	        stmt.setString(4, "B");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName, QueryUtil.getExplainPlan(rs));
+	
+	        query = "SELECT a.* FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+	        }
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("y",rs.getString(1));
+	        assertEquals("2",rs.getString(2));
+	        assertEquals("y",rs.getString("v1"));
+	        assertEquals("2",rs.getString("v2"));
+	        assertTrue(rs.next());
+	        assertEquals("x",rs.getString(1));
+	        assertEquals("1",rs.getString(2));
+	        assertEquals("x",rs.getString("v1"));
+	        assertEquals("1",rs.getString("v2"));
+	        assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testUpsertAfterIndexDrop() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+    
+	        // make sure that the tables are empty, but reachable
+	        conn.createStatement().execute(
+	          "CREATE TABLE " + fullTableName
+	              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	    
+	        conn.createStatement().execute(
+	          "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	    
+	        // load some data into the table
+	        PreparedStatement stmt =
+	            conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        // make sure the index is working as expected
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x", rs.getString(1));
+	        assertEquals("1", rs.getString(2));
+	        assertEquals("a", rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        String ddl = "DROP INDEX " + indexName + " ON " + fullTableName;
+	        stmt = conn.prepareStatement(ddl);
+	        stmt.execute();
+	        
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k, v1) VALUES(?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, "y");
+	        stmt.execute();
+	        conn.commit();
+	    
+	        query = "SELECT * FROM " + fullTableName;
+	    
+	        // check that the data table matches as expected
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a", rs.getString(1));
+	        assertEquals("y", rs.getString(2));
+	        assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testMultipleUpdatesAcrossRegions() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+    
+	        // make sure that the tables are empty, but reachable
+	        conn.createStatement().execute(
+	          "CREATE TABLE " + fullTableName
+	              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "  + HTableDescriptor.MAX_FILESIZE + "=1, " + HTableDescriptor.MEMSTORE_FLUSHSIZE + "=1 " 
+	        		  + (!tableDDLOptions.isEmpty() ? "," + tableDDLOptions : "") + "SPLIT ON ('b')");
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        conn.createStatement().execute(
+	  	          "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	    
+	        // load some data into the table
+	        PreparedStatement stmt =
+	            conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        stmt.setString(1, "b");
+	        stmt.setString(2, "y");
+	        stmt.setString(3, "2");
+	        stmt.execute();
+	        stmt.setString(1, "c");
+	        stmt.setString(2, "z");
+	        stmt.setString(3, "3");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        // make sure the index is working as expected
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x", rs.getString(1));
+	        assertEquals("1", rs.getString(2));
+	        assertEquals("a", rs.getString(3));
+	        assertTrue(rs.next());
+	        assertEquals("y", rs.getString(1));
+	        assertEquals("2", rs.getString(2));
+	        assertEquals("b", rs.getString(3));
+	        assertTrue(rs.next());
+	        assertEquals("z", rs.getString(1));
+	        assertEquals("3", rs.getString(2));
+	        assertEquals("c", rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if (localIndex) {
+	            assertEquals("CLIENT PARALLEL 2-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\n"
+	                       + "    SERVER FILTER BY FIRST KEY ONLY\n"
+	                       + "CLIENT MERGE SORT",
+	                QueryUtil.getExplainPlan(rs));
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n"
+	                    + "    SERVER FILTER BY FIRST KEY ONLY",
+	                QueryUtil.getExplainPlan(rs));
+	        }
+	    
+	        // check that the data table matches as expected
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a", rs.getString(1));
+	        assertEquals("x", rs.getString(2));
+	        assertEquals("1", rs.getString(3));
+	        assertTrue(rs.next());
+	        assertEquals("b", rs.getString(1));
+	        assertEquals("y", rs.getString(2));
+	        assertEquals("2", rs.getString(3));
+	        assertTrue(rs.next());
+	        assertEquals("c", rs.getString(1));
+	        assertEquals("z", rs.getString(2));
+	        assertEquals("3", rs.getString(3));
+	        assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testIndexWithCaseSensitiveCols() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, \"V1\" VARCHAR, \"v2\" VARCHAR)"+tableDDLOptions);
+            query = "SELECT * FROM "+fullTableName;
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+            conn.createStatement().execute(
+  	  	          "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + "(\"v2\") INCLUDE (\"V1\")");
+            query = "SELECT * FROM "+fullIndexName;
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+            stmt.setString(1,"a");
+            stmt.setString(2, "x");
+            stmt.setString(3, "1");
+            stmt.execute();
+            stmt.setString(1,"b");
+            stmt.setString(2, "y");
+            stmt.setString(3, "2");
+            stmt.execute();
+            conn.commit();
+
+            query = "SELECT * FROM " + fullTableName + " WHERE \"v2\" = '1'";
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if(localIndex){
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName + " [-32768,'1']\n"
+                           + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullIndexName + " ['1']", QueryUtil.getExplainPlan(rs));
+            }
+
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("a",rs.getString(1));
+            assertEquals("x",rs.getString(2));
+            assertEquals("1",rs.getString(3));
+            assertEquals("a",rs.getString("k"));
+            assertEquals("x",rs.getString("V1"));
+            assertEquals("1",rs.getString("v2"));
+            assertFalse(rs.next());
+
+            query = "SELECT \"V1\", \"V1\" as foo1, \"v2\" as foo, \"v2\" as \"Foo1\", \"v2\" FROM " + fullTableName + " ORDER BY foo";
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if(localIndex){
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName + " [-32768]\nCLIENT MERGE SORT",
+                    QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER "+fullIndexName, QueryUtil.getExplainPlan(rs));
+            }
+
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("x",rs.getString(1));
+            assertEquals("x",rs.getString("V1"));
+            assertEquals("x",rs.getString(2));
+            assertEquals("x",rs.getString("foo1"));
+            assertEquals("1",rs.getString(3));
+            assertEquals("1",rs.getString("Foo"));
+            assertEquals("1",rs.getString(4));
+            assertEquals("1",rs.getString("Foo1"));
+            assertEquals("1",rs.getString(5));
+            assertEquals("1",rs.getString("v2"));
+            assertTrue(rs.next());
+            assertEquals("y",rs.getString(1));
+            assertEquals("y",rs.getString("V1"));
+            assertEquals("y",rs.getString(2));
+            assertEquals("y",rs.getString("foo1"));
+            assertEquals("2",rs.getString(3));
+            assertEquals("2",rs.getString("Foo"));
+            assertEquals("2",rs.getString(4));
+            assertEquals("2",rs.getString("Foo1"));
+            assertEquals("2",rs.getString(5));
+            assertEquals("2",rs.getString("v2"));
+            assertFalse(rs.next());
+        } 
+    }
+
+    @Test
+    public void testInFilterOnIndexedTable() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        
+	        String ddl = "CREATE TABLE " + fullTableName +"  (PK1 CHAR(2) NOT NULL PRIMARY KEY, CF1.COL1 BIGINT) " + tableDDLOptions;
+	        conn.createStatement().execute(ddl);
+	        ddl = "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + "(COL1)";
+	        conn.createStatement().execute(ddl);
+	
+	        query = "SELECT COUNT(COL1) FROM " + fullTableName +" WHERE COL1 IN (1,25,50,75,100)"; 
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+        } 
+    }
+
+    @Test
+    public void testIndexWithDecimalCol() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+            Date date = new Date(System.currentTimeMillis());
+            
+            createMultiCFTestTable(fullTableName, tableDDLOptions);
+            populateMultiCFTestTable(fullTableName, date);
+            String ddl = null;
+            ddl = "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (decimal_pk) INCLUDE (decimal_col1, decimal_col2)";
+            PreparedStatement stmt = conn.prepareStatement(ddl);
+            stmt.execute();
+            
+            query = "SELECT decimal_pk, decimal_col1, decimal_col2 from " + fullTableName ;
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if(localIndex) {
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+            }
+            
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(new BigDecimal("1.1"), rs.getBigDecimal(1));
+            assertEquals(new BigDecimal("2.1"), rs.getBigDecimal(2));
+            assertEquals(new BigDecimal("3.1"), rs.getBigDecimal(3));
+            assertTrue(rs.next());
+            assertEquals(new BigDecimal("2.2"), rs.getBigDecimal(1));
+            assertEquals(new BigDecimal("3.2"), rs.getBigDecimal(2));
+            assertEquals(new BigDecimal("4.2"), rs.getBigDecimal(3));
+            assertTrue(rs.next());
+            assertEquals(new BigDecimal("3.3"), rs.getBigDecimal(1));
+            assertEquals(new BigDecimal("4.3"), rs.getBigDecimal(2));
+            assertEquals(new BigDecimal("5.3"), rs.getBigDecimal(3));
+            assertFalse(rs.next());
+        } 
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalMutableIndexIT.java
deleted file mode 100644
index fe17dbc..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalMutableIndexIT.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-public class LocalMutableIndexIT extends BaseMutableIndexIT {
-
-    public LocalMutableIndexIT() {
-        super(true);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index d11c059..e2af717 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -26,11 +26,13 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
@@ -73,6 +75,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 /**
  * 
  * Test for failure of region server to write to index table.
@@ -84,9 +89,9 @@ import org.junit.experimental.categories.Category;
  */
 
 @Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
 public class MutableIndexFailureIT extends BaseTest {
     private static final int NUM_SLAVES = 4;
-    private static String url;
     private static PhoenixTestDriver driver;
     private static HBaseTestingUtility util;
     private Timer scheduleTimer;
@@ -95,7 +100,15 @@ public class MutableIndexFailureIT extends BaseTest {
     private static final String INDEX_TABLE_NAME = "I";
     private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
     private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I");
-
+    
+    private boolean transactional; 
+	private final String tableDDLOptions;
+    
+    public MutableIndexFailureIT(boolean transactional) {
+		this.transactional = transactional;
+		this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : "";
+    }
+    
     @Before
     public void doSetup() throws Exception {
         Configuration conf = HBaseConfiguration.create();
@@ -113,6 +126,15 @@ public class MutableIndexFailureIT extends BaseTest {
         url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
                 + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
         driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
+		clusterInitialized = true;
+		setupTxManager();
+    }
+	
+	@Parameters(name="transactional = {0}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false}, {true } 
+           });
     }
 
     @After
@@ -133,227 +155,275 @@ public class MutableIndexFailureIT extends BaseTest {
 
     @Test(timeout=300000)
     public void testWriteFailureDisablesLocalIndex() throws Exception {
-        testWriteFailureDisablesIndex(true);
+    	helpTestWriteFailureDisablesIndex(true);
     }
- 
+    
     @Test(timeout=300000)
-    public void testWriteFailureDisablesIndex() throws Exception {
-        testWriteFailureDisablesIndex(false);
+    public void testWriteFailureDisablesGlobalIndex() throws Exception {
+    	helpTestWriteFailureDisablesIndex(false);
     }
-    
-    public void testWriteFailureDisablesIndex(boolean localIndex) throws Exception {
-        String query;
-        ResultSet rs;
 
+    private void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = driver.connect(url, props);
-        conn.setAutoCommit(false);
-        conn.createStatement().execute(
-                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        if(localIndex) {
-            conn.createStatement().execute(
-                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-            conn.createStatement().execute(
-                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " + DATA_TABLE_FULL_NAME + " (v2) INCLUDE (v1)");
-        } else {
-            conn.createStatement().execute(
-                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+        try (Connection conn = driver.connect(url, props);) {
+        	String query;
+        	ResultSet rs;
+	        conn.setAutoCommit(false);
+	        conn.createStatement().execute(
+	                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+	        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        if(localIndex) {
+	            conn.createStatement().execute(
+	                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+	            conn.createStatement().execute(
+	                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " + DATA_TABLE_FULL_NAME + " (v2) INCLUDE (v1)");
+	        } else {
+	            conn.createStatement().execute(
+	                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+	        }
+	            
+	        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        // Verify the metadata for index is correct.
+	        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+	                new String[] { PTableType.INDEX.toString() });
+	        assertTrue(rs.next());
+	        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
+	        assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+	        assertFalse(rs.next());
+	        
+	        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        conn.commit();
+	
+	        TableName indexTable =
+	                TableName.valueOf(localIndex ? MetaDataUtil
+	                        .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : INDEX_TABLE_FULL_NAME);
+	        HBaseAdmin admin = this.util.getHBaseAdmin();
+	        HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
+	        try{
+	          admin.disableTable(indexTable);
+	          admin.deleteTable(indexTable);
+	        } catch (TableNotFoundException ignore) {}
+	
+	        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+	        stmt.setString(1, "a2");
+	        stmt.setString(2, "x2");
+	        stmt.setString(3, "2");
+	        stmt.execute();
+	        
+	        if (transactional) {
+	            try {
+	                conn.commit();
+	                fail();
+	            } catch (SQLException e1) {
+		        	try {
+		        		conn.rollback();
+		        		fail();
+		        	} catch (SQLException e2) {
+		        		// rollback fails as well because index is disabled
+		        	}
+	            }
+	        }
+	        else {
+	        	conn.commit();
+	        }
+	        
+	        // Verify the metadata for index is correct.
+	        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+	                new String[] { PTableType.INDEX.toString() });
+	        assertTrue(rs.next());
+	        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
+	        // if the table is transactional, the index will not be disabled if there is a failure
+	        PIndexState indexState = transactional ? PIndexState.ACTIVE : PIndexState.DISABLE;
+			assertEquals(indexState.toString(), rs.getString("INDEX_STATE"));
+	        assertFalse(rs.next());
+	        if(localIndex) {
+	            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
+	                new String[] { PTableType.INDEX.toString() });
+	            assertTrue(rs.next());
+	            assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3));
+	            assertEquals(indexState.toString(), rs.getString("INDEX_STATE"));
+	            assertFalse(rs.next());
+	        }
+	
+	        // if the table is transactional the write to the index table  will fail because the index has not been disabled
+	        if (!transactional) {
+		        // Verify UPSERT on data table still work after index is disabled       
+		        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+		        stmt.setString(1, "a3");
+		        stmt.setString(2, "x3");
+		        stmt.setString(3, "3");
+		        stmt.execute();
+		        conn.commit();
+	        }
+	        
+	        if (transactional) {
+	        	// if the table was transactional there should be 1 row (written before the index was disabled)
+	        	query = "SELECT /*+ NO_INDEX */ v2 FROM " + DATA_TABLE_FULL_NAME;
+		        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+		        String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME;
+				assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
+		        rs = conn.createStatement().executeQuery(query);
+		        assertTrue(rs.next());
+	        	assertEquals("1", rs.getString(1));
+	        	assertFalse(rs.next());
+	        } else {
+	        	// if the table was not transactional there should be three rows (all writes to data table should succeed)
+		        query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME;
+		        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+		        String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME;
+				assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
+		        rs = conn.createStatement().executeQuery(query);
+	        	assertTrue(rs.next());
+	        	assertEquals("1", rs.getString(1));
+	        	assertTrue(rs.next());
+	        	assertEquals("2", rs.getString(1));
+	        	assertTrue(rs.next());
+	        	assertEquals("3", rs.getString(1));
+	        	assertFalse(rs.next());
+	        }
+	        
+	        // recreate index table
+	        admin.createTable(indexTableDesc);
+	        do {
+	          rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+	              new String[] { PTableType.INDEX.toString() });
+	          assertTrue(rs.next());
+	          if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+	              break;
+	          }
+	          if(localIndex) {
+	              rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
+	                  new String[] { PTableType.INDEX.toString() });
+	              assertTrue(rs.next());
+	              if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+	                  break;
+	              }
+	          }
+	          Thread.sleep(15 * 1000); // sleep 15 secs
+	        } while(true);
+	        
+	        // Verify UPSERT on data table still work after index table is recreated       
+	        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+	        stmt.setString(1, "a4");
+	        stmt.setString(2, "x4");
+	        stmt.setString(3, "4");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        // verify index table has data
+	        query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        
+	        // for txn tables there will be only one row in the index (a4)
+	        // for non txn tables there will be three rows because we only partially build index from where we failed and the oldest 
+	        // index row has been deleted when we dropped the index table during test
+	        assertEquals( transactional ? 1: 3, rs.getInt(1));
         }
-            
-        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        // Verify the metadata for index is correct.
-        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-                new String[] { PTableType.INDEX.toString() });
-        assertTrue(rs.next());
-        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
-        assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-        assertFalse(rs.next());
-        
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        conn.commit();
-
-        TableName indexTable =
-                TableName.valueOf(localIndex ? MetaDataUtil
-                        .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : INDEX_TABLE_FULL_NAME);
-        HBaseAdmin admin = this.util.getHBaseAdmin();
-        HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
-        try{
-          admin.disableTable(indexTable);
-          admin.deleteTable(indexTable);
-        } catch (TableNotFoundException ignore) {}
-
-        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a2");
-        stmt.setString(2, "x2");
-        stmt.setString(3, "2");
-        stmt.execute();
-        try {
-            conn.commit();
-        } catch (SQLException e) {}
-
-        // Verify the metadata for index is correct.
-        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-                new String[] { PTableType.INDEX.toString() });
-        assertTrue(rs.next());
-        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
-        assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE"));
-        assertFalse(rs.next());
-        if(localIndex) {
-            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
-                new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3));
-            assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-        }
-
-        // Verify UPSERT on data table still work after index is disabled       
-        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a3");
-        stmt.setString(2, "x3");
-        stmt.setString(3, "3");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME + " where v1='x3'";
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        assertTrue(QueryUtil.getExplainPlan(rs).contains("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME));
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        
-        // recreate index table
-        admin.createTable(indexTableDesc);
-        do {
-          Thread.sleep(15 * 1000); // sleep 15 secs
-          rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-              new String[] { PTableType.INDEX.toString() });
-          assertTrue(rs.next());
-          if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-              break;
-          }
-          if(localIndex) {
-              rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
-                  new String[] { PTableType.INDEX.toString() });
-              assertTrue(rs.next());
-              if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-                  break;
-              }
-          }
-        } while(true);
-        
-        // verify index table has data
-        query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        
-        // using 2 here because we only partially build index from where we failed and the oldest 
-        // index row has been deleted when we dropped the index table during test.
-        assertEquals(2, rs.getInt(1));
     }
     
     @Test(timeout=300000)
     public void testWriteFailureWithRegionServerDown() throws Exception {
-        String query;
-        ResultSet rs;
-
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = driver.connect(url, props);
-        conn.setAutoCommit(false);
-        conn.createStatement().execute(
-                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        conn.createStatement().execute(
-                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        // Verify the metadata for index is correct.
-        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-                new String[] { PTableType.INDEX.toString() });
-        assertTrue(rs.next());
-        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
-        assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-        assertFalse(rs.next());
-        
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        conn.commit();
-        
-        // find a RS which doesn't has CATALOG table
-        TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
-        TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME);
-        final HBaseCluster cluster = this.util.getHBaseCluster();
-        Collection<ServerName> rss = cluster.getClusterStatus().getServers();
-        HBaseAdmin admin = this.util.getHBaseAdmin();
-        List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
-        ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getRegionName());
-        ServerName metaRS = cluster.getServerHoldingMeta();
-        ServerName rsToBeKilled = null;
-        
-        // find first RS isn't holding META or CATALOG table
-        for(ServerName curRS : rss) {
-            if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
-                rsToBeKilled = curRS;
-                break;
-            }
+        try (Connection conn = driver.connect(url, props);) {
+        	String query;
+        	ResultSet rs;
+	        conn.setAutoCommit(false);
+	        conn.createStatement().execute(
+	                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+	        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        conn.createStatement().execute(
+	                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+	        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        // Verify the metadata for index is correct.
+	        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+	                new String[] { PTableType.INDEX.toString() });
+	        assertTrue(rs.next());
+	        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
+	        assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+	        assertFalse(rs.next());
+	        
+	        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        // find a RS which doesn't has CATALOG table
+	        TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
+	        TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME);
+	        final HBaseCluster cluster = this.util.getHBaseCluster();
+	        Collection<ServerName> rss = cluster.getClusterStatus().getServers();
+	        HBaseAdmin admin = this.util.getHBaseAdmin();
+	        List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
+	        ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getRegionName());
+	        ServerName metaRS = cluster.getServerHoldingMeta();
+	        ServerName rsToBeKilled = null;
+	        
+	        // find first RS isn't holding META or CATALOG table
+	        for(ServerName curRS : rss) {
+	            if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
+	                rsToBeKilled = curRS;
+	                break;
+	            }
+	        }
+	        assertTrue(rsToBeKilled != null);
+	        
+	        regions = admin.getTableRegions(indexTable);
+	        final HRegionInfo indexRegion = regions.get(0);
+	        final ServerName dstRS = rsToBeKilled;
+	        admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName()));
+	        this.util.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
+	            @Override
+	            public boolean evaluate() throws Exception {
+	              ServerName sn = cluster.getServerHoldingRegion(indexRegion.getRegionName());
+	              return (sn != null && sn.equals(dstRS));
+	            }
+	          });
+	        
+	        // use timer sending updates in every 10ms
+	        this.scheduleTimer = new Timer(true);
+	        this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn), 0, 10);
+	        // let timer sending some updates
+	        Thread.sleep(100);
+	        
+	        // kill RS hosting index table
+	        this.util.getHBaseCluster().killRegionServer(rsToBeKilled);
+	        
+	        // wait for index table completes recovery
+	        this.util.waitUntilAllRegionsAssigned(indexTable);
+	        
+	        // Verify the metadata for index is correct.       
+	        do {
+	          Thread.sleep(15 * 1000); // sleep 15 secs
+	          rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+	              new String[] { PTableType.INDEX.toString() });
+	          assertTrue(rs.next());
+	          if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+	              break;
+	          }
+	        } while(true);
+	        this.scheduleTimer.cancel();
+	        
+	        assertEquals(cluster.getClusterStatus().getDeadServers(), 1);
         }
-        assertTrue(rsToBeKilled != null);
-        
-        regions = admin.getTableRegions(indexTable);
-        final HRegionInfo indexRegion = regions.get(0);
-        final ServerName dstRS = rsToBeKilled;
-        admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName()));
-        this.util.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
-            @Override
-            public boolean evaluate() throws Exception {
-              ServerName sn = cluster.getServerHoldingRegion(indexRegion.getRegionName());
-              return (sn != null && sn.equals(dstRS));
-            }
-          });
-        
-        // use timer sending updates in every 10ms
-        this.scheduleTimer = new Timer(true);
-        this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn), 0, 10);
-        // let timer sending some updates
-        Thread.sleep(100);
-        
-        // kill RS hosting index table
-        this.util.getHBaseCluster().killRegionServer(rsToBeKilled);
-        
-        // wait for index table completes recovery
-        this.util.waitUntilAllRegionsAssigned(indexTable);
-        
-        // Verify the metadata for index is correct.       
-        do {
-          Thread.sleep(15 * 1000); // sleep 15 secs
-          rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-              new String[] { PTableType.INDEX.toString() });
-          assertTrue(rs.next());
-          if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-              break;
-          }
-        } while(true);
-        this.scheduleTimer.cancel();
-        
-        assertEquals(cluster.getClusterStatus().getDeadServers(), 1);
     }
     
     static class SendingUpdatesScheduleTask extends TimerTask {


[4/4] phoenix git commit: Increase testing around transaction integration PHOENIX-1900

Posted by td...@apache.org.
Increase testing around transaction integration PHOENIX-1900


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d81e660e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d81e660e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d81e660e

Branch: refs/heads/txn
Commit: d81e660e36205acdfce1b928e0154cf25a2557f6
Parents: acacaf3
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Thu Sep 17 12:07:25 2015 -0700
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Mon Oct 26 15:38:22 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/SkipScanQueryIT.java |    2 +-
 .../end2end/index/BaseMutableIndexIT.java       | 1175 ------------------
 .../end2end/index/GlobalMutableIndexIT.java     |   26 -
 .../phoenix/end2end/index/ImmutableIndexIT.java |  460 +------
 .../apache/phoenix/end2end/index/IndexIT.java   |  884 +++++++++++++
 .../end2end/index/LocalMutableIndexIT.java      |   26 -
 .../end2end/index/MutableIndexFailureIT.java    |  492 ++++----
 .../phoenix/end2end/index/MutableIndexIT.java   |  602 +++++++++
 .../end2end/index/TxGlobalMutableIndexIT.java   |   85 --
 .../end2end/index/TxImmutableIndexIT.java       |  121 --
 .../end2end/index/txn/MutableRollbackIT.java    |  260 ++++
 .../phoenix/end2end/index/txn/RollbackIT.java   |  144 +++
 .../end2end/index/txn/TxWriteFailureIT.java     |  198 +++
 .../org/apache/phoenix/tx/TransactionIT.java    |   14 +-
 .../org/apache/phoenix/tx/TxCheckpointIT.java   |  412 +++---
 .../apache/phoenix/tx/TxPointInTimeQueryIT.java |   26 +-
 .../apache/phoenix/compile/FromCompiler.java    |    4 +-
 .../coprocessor/MetaDataEndpointImpl.java       |    4 +-
 .../phoenix/exception/SQLExceptionCode.java     |    2 +-
 .../apache/phoenix/execute/MutationState.java   |  100 +-
 .../apache/phoenix/index/IndexMaintainer.java   |   16 +-
 .../apache/phoenix/index/PhoenixIndexCodec.java |    5 +-
 .../index/PhoenixTransactionalIndexer.java      |  129 +-
 .../apache/phoenix/optimize/QueryOptimizer.java |    3 +-
 .../apache/phoenix/schema/MetaDataClient.java   |   12 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |    2 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |    3 +-
 .../org/apache/phoenix/util/ServerUtil.java     |    5 +-
 .../apache/phoenix/util/TransactionUtil.java    |   46 +-
 .../java/org/apache/phoenix/query/BaseTest.java |   88 +-
 pom.xml                                         |    2 +-
 31 files changed, 3008 insertions(+), 2340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanQueryIT.java
index 86608fb..1937f65 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanQueryIT.java
@@ -308,7 +308,7 @@ public class SkipScanQueryIT extends BaseHBaseManagedTimeIT {
         Connection conn = DriverManager.getConnection(getUrl(), props);
         conn.setAutoCommit(false);
         try {
-            createMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME);
+            createMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME, null);
             populateMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME);
             String upsert = "UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME
                     + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
deleted file mode 100644
index 000787b..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
+++ /dev/null
@@ -1,1175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.math.BigDecimal;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.phoenix.compile.ColumnResolver;
-import org.apache.phoenix.compile.FromCompiler;
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.end2end.Shadower;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.parse.NamedTableNode;
-import org.apache.phoenix.parse.TableName;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-import com.google.common.primitives.Doubles;
-
-
-public abstract class BaseMutableIndexIT extends BaseHBaseManagedTimeIT {
-    
-    @BeforeClass
-    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan
-        // Forces server cache to be used
-        props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
-        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
-    
-    private final boolean localIndex;
-    
-    BaseMutableIndexIT(boolean localIndex) {
-        this.localIndex = localIndex;
-    }
-    
-    @Test
-    public void createIndexOnTableWithSpecifiedDefaultCF() throws Exception {
-        String query;
-        ResultSet rs;
-
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute(
-                "CREATE TABLE " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) DEFAULT_COLUMN_FAMILY='A'");
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        String options = localIndex ? "SALT_BUCKETS=10, MULTI_TENANT=true, IMMUTABLE_ROWS=true, DISABLE_WAL=true" : "";
-        conn.createStatement().execute(
-                "CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2) " + options);
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-        
-        //check options set correctly on index
-        TableName indexName = TableName.create(TestUtil.DEFAULT_SCHEMA_NAME, TestUtil.DEFAULT_INDEX_TABLE_NAME);
-        NamedTableNode indexNode = NamedTableNode.create(null, indexName, null);
-        ColumnResolver resolver = FromCompiler.getResolver(indexNode, conn.unwrap(PhoenixConnection.class));
-        PTable indexTable = resolver.getTables().get(0).getTable();
-        // Can't set IMMUTABLE_ROWS, MULTI_TENANT or DEFAULT_COLUMN_FAMILY_NAME on an index
-        assertNull(indexTable.getDefaultFamilyName());
-        assertFalse(indexTable.isMultiTenant());
-        assertFalse(indexTable.isImmutableRows());
-        if(localIndex) {
-            assertEquals(10, indexTable.getBucketNum().intValue());
-            assertTrue(indexTable.isWALDisabled());
-        }
-    }
-
-    @Test
-    public void testIndexWithNullableFixedWithCols() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            createMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME);
-            populateMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME);
-            String ddl = null;
-            if(localIndex){
-                ddl = "CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME
-                        + " (char_col1 ASC, int_col1 ASC)"
-                        + " INCLUDE (long_col1, long_col2)";
-            } else {
-                ddl = "CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME
-                        + " (char_col1 ASC, int_col1 ASC)"
-                        + " INCLUDE (long_col1, long_col2)";
-            }
-            PreparedStatement stmt = conn.prepareStatement(ddl);
-            stmt.execute();
-            
-            String query = "SELECT d.char_col1, int_col1 from " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " as d";
-            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-            if (localIndex) {
-                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.getLocalIndexTableName(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME)+" [-32768]\n"
-                           + "    SERVER FILTER BY FIRST KEY ONLY\n"
-                           + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-            } else {
-                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME + "\n" +
-                             "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
-            }
-            
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals("chara", rs.getString("char_col1"));
-            assertEquals(2, rs.getInt(2));
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(3, rs.getInt(2));
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(4, rs.getInt(2));
-            assertFalse(rs.next());
-        } finally {
-            conn.close();
-        }
-    }
-
-    @Test
-    public void testIndexWithNullableDateCol() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            Date date = new Date(System.currentTimeMillis());
-            
-            createMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME);
-            populateMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME, date);
-            String ddl = null;
-            if (localIndex) {
-                ddl = "CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (date_col)";
-            } else {
-                ddl = "CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (date_col)";
-            }
-            PreparedStatement stmt = conn.prepareStatement(ddl);
-            stmt.execute();
-            
-            String query = "SELECT int_pk from " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME ;
-            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-            if (localIndex) {
-                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME +" [-32768]\n"
-                           + "    SERVER FILTER BY FIRST KEY ONLY\n"
-                           + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-            } else {
-                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME + "\n"
-                        + "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
-            }
-            
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals(2, rs.getInt(1));
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertTrue(rs.next());
-            assertEquals(3, rs.getInt(1));
-            assertFalse(rs.next());
-            
-            query = "SELECT date_col from " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " order by date_col" ;
-            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-            if (localIndex) {
-                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " [-32768]\n"
-                        + "    SERVER FILTER BY FIRST KEY ONLY\n"
-                        + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-            } else {
-                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME + "\n"
-                        + "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
-            }
-            
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals(date, rs.getDate(1));
-            assertTrue(rs.next());
-            assertEquals(new Date(date.getTime() + TestUtil.MILLIS_IN_DAY), rs.getDate(1));
-            assertTrue(rs.next());
-            assertEquals(new Date(date.getTime() + 2 * TestUtil.MILLIS_IN_DAY), rs.getDate(1));
-            assertFalse(rs.next());
-        } finally {
-            conn.close();
-        }
-    }
-
-    @Test
-    public void testCoveredColumnUpdates() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            createMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME);
-            populateMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME);
-            String ddl = null;
-            if(localIndex) {
-                ddl = "CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME
-                        + " (char_col1 ASC, int_col1 ASC)"
-                        + " INCLUDE (long_col1, long_col2)";
-            } else {
-                ddl = "CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME
-                        + " (char_col1 ASC, int_col1 ASC)"
-                        + " INCLUDE (long_col1, long_col2)";
-            }
-
-            PreparedStatement stmt = conn.prepareStatement(ddl);
-            stmt.execute();
-            
-            String query = "SELECT char_col1, int_col1, long_col2 from " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-            if (localIndex) {
-                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME +" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-            } else {
-                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME, QueryUtil.getExplainPlan(rs));
-            }
-            
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(2, rs.getInt(2));
-            assertEquals(3L, rs.getLong(3));
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(3, rs.getInt(2));
-            assertEquals(4L, rs.getLong(3));
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(4, rs.getInt(2));
-            assertEquals(5L, rs.getLong(3));
-            assertFalse(rs.next());
-            
-            stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME
-                    + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2*2 FROM "
-                    + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " WHERE long_col2=?");
-            stmt.setLong(1,4L);
-            assertEquals(1,stmt.executeUpdate());
-            conn.commit();
-
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(2, rs.getInt(2));
-            assertEquals(3L, rs.getLong(3));
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(3, rs.getInt(2));
-            assertEquals(8L, rs.getLong(3));
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(4, rs.getInt(2));
-            assertEquals(5L, rs.getLong(3));
-            assertFalse(rs.next());
-            
-            stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME
-                    + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, null FROM "
-                    + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " WHERE long_col2=?");
-            stmt.setLong(1,3L);
-            assertEquals(1,stmt.executeUpdate());
-            conn.commit();
-            
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(2, rs.getInt(2));
-            assertEquals(0, rs.getLong(3));
-            assertTrue(rs.wasNull());
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(3, rs.getInt(2));
-            assertEquals(8L, rs.getLong(3));
-            assertTrue(rs.next());
-            assertEquals("chara", rs.getString(1));
-            assertEquals(4, rs.getInt(2));
-            assertEquals(5L, rs.getLong(3));
-            assertFalse(rs.next());
-            if(localIndex) {
-                query = "SELECT b.* from " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " where int_col1 = 4";
-                rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME +" [-32768]\n" +
-                		"    SERVER FILTER BY TO_INTEGER(\"INT_COL1\") = 4\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-                rs = conn.createStatement().executeQuery(query);
-                assertTrue(rs.next());
-                assertEquals("varchar_b", rs.getString(1));
-                assertEquals("charb", rs.getString(2));
-                assertEquals(5, rs.getInt(3));
-                assertEquals(5, rs.getLong(4));
-                assertFalse(rs.next());
-                
-            }
-        } finally {
-            conn.close();
-        }
-    }
-    
-    @Test
-    public void testSelectAllAndAliasWithIndex() throws Exception {
-        String query;
-        ResultSet rs;
-        
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        conn.createStatement().execute("CREATE TABLE " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-        
-        if (localIndex) {
-            conn.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v2 DESC) INCLUDE (v1)");
-        } else {
-            conn.createStatement().execute("CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v2 DESC) INCLUDE (v1)");
-        }
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1,"a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        stmt.setString(1,"b");
-        stmt.setString(2, "y");
-        stmt.setString(3, "2");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        if(localIndex){
-            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-        } else {
-            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME, QueryUtil.getExplainPlan(rs));
-        }
-
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("b",rs.getString(1));
-        assertEquals("y",rs.getString(2));
-        assertEquals("2",rs.getString(3));
-        assertEquals("b",rs.getString("k"));
-        assertEquals("y",rs.getString("v1"));
-        assertEquals("2",rs.getString("v2"));
-        assertTrue(rs.next());
-        assertEquals("a",rs.getString(1));
-        assertEquals("x",rs.getString(2));
-        assertEquals("1",rs.getString(3));
-        assertEquals("a",rs.getString("k"));
-        assertEquals("x",rs.getString("v1"));
-        assertEquals("1",rs.getString("v2"));
-        assertFalse(rs.next());
-        
-        query = "SELECT v1 as foo FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " WHERE v2 = '1' ORDER BY foo";
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        if(localIndex){
-            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" +TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " [-32768,~'1']\n" + 
-                    "    SERVER SORTED BY [\"V1\"]\n" + 
-                    "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-        } else {
-            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " +TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME + " [~'1']\n" + 
-                    "    SERVER SORTED BY [\"V1\"]\n" + 
-                    "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-        }
-
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("x",rs.getString(1));
-        assertEquals("x",rs.getString("foo"));
-        assertFalse(rs.next());
-    }
-    
-    @Test
-    public void testSelectCF() throws Exception {
-        String query;
-        ResultSet rs;
-        
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        conn.createStatement().execute("CREATE TABLE " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, a.v1 VARCHAR, a.v2 VARCHAR, b.v1 VARCHAR)  ");
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-        if(localIndex) {
-            conn.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v2 DESC) INCLUDE (a.v1)");
-        } else {
-            conn.createStatement().execute("CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v2 DESC) INCLUDE (a.v1)");
-        }
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " VALUES(?,?,?,?)");
-        stmt.setString(1,"a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.setString(4, "A");
-        stmt.execute();
-        stmt.setString(1,"b");
-        stmt.setString(2, "y");
-        stmt.setString(3, "2");
-        stmt.setString(4, "B");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME, QueryUtil.getExplainPlan(rs));
-
-        query = "SELECT a.* FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        if(localIndex) {
-            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-        } else {
-            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME, QueryUtil.getExplainPlan(rs));
-        }
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("y",rs.getString(1));
-        assertEquals("2",rs.getString(2));
-        assertEquals("y",rs.getString("v1"));
-        assertEquals("2",rs.getString("v2"));
-        assertTrue(rs.next());
-        assertEquals("x",rs.getString(1));
-        assertEquals("1",rs.getString(2));
-        assertEquals("x",rs.getString("v1"));
-        assertEquals("1",rs.getString("v2"));
-        assertFalse(rs.next());
-    }
-    
-    @Test
-    public void testCoveredColumns() throws Exception {
-        String query;
-        ResultSet rs;
-        
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        conn.createStatement().execute("CREATE TABLE " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-        
-        if(localIndex) {
-            conn.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-        } else {
-            conn.createStatement().execute("CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-        }
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1,"a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("x",rs.getString(1));
-        assertEquals("a",rs.getString(2));
-        assertEquals("1",rs.getString(3));
-        assertFalse(rs.next());
-
-        stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + "(k,v2) VALUES(?,?)");
-        stmt.setString(1,"a");
-        stmt.setString(2, null);
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("x",rs.getString(1));
-        assertEquals("a",rs.getString(2));
-        assertNull(rs.getString(3));
-        assertFalse(rs.next());
-
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        if(localIndex) {
-            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));            
-        } else {
-            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME, QueryUtil.getExplainPlan(rs));
-        }
-
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("a",rs.getString(1));
-        assertEquals("x",rs.getString(2));
-        assertNull(rs.getString(3));
-        assertFalse(rs.next());
-
-        stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + "(k,v2) VALUES(?,?)");
-        stmt.setString(1,"a");
-        stmt.setString(2,"3");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        if(localIndex) {
-            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));            
-        } else {
-            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME, QueryUtil.getExplainPlan(rs));
-        }
-        
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("a",rs.getString(1));
-        assertEquals("x",rs.getString(2));
-        assertEquals("3",rs.getString(3));
-        assertFalse(rs.next());
-
-        stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + "(k,v2) VALUES(?,?)");
-        stmt.setString(1,"a");
-        stmt.setString(2,"4");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        if(localIndex) {
-            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));            
-        } else {
-            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME, QueryUtil.getExplainPlan(rs));
-        }
-        
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("a",rs.getString(1));
-        assertEquals("x",rs.getString(2));
-        assertEquals("4",rs.getString(3));
-        assertFalse(rs.next());
-    }
-
-    @Test
-    public void testCompoundIndexKey() throws Exception {
-        String query;
-        ResultSet rs;
-        
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-
-        // make sure that the tables are empty, but reachable
-        conn.createStatement().execute("CREATE TABLE " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-        if(localIndex) {
-            conn.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v1, v2)");
-        } else {
-            conn.createStatement().execute("CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v1, v2)");
-        }
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        // load some data into the table
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1,"a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("x",rs.getString(1));
-        assertEquals("1",rs.getString(2));
-        assertEquals("a",rs.getString(3));
-        assertFalse(rs.next());
-
-        stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1,"a");
-        stmt.setString(2, "y");
-        stmt.setString(3, null);
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("y",rs.getString(1));
-        assertNull(rs.getString(2));
-        assertEquals("a",rs.getString(3));
-        assertFalse(rs.next());
-
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        if (localIndex) {
-            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME+" [-32768]\n"
-                    + "    SERVER FILTER BY FIRST KEY ONLY\n"
-                    + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-        } else {
-            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME + "\n"
-                       + "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
-        }
-        //make sure the data table looks like what we expect
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("a",rs.getString(1));
-        assertEquals("y",rs.getString(2));
-        assertNull(rs.getString(3));
-        assertFalse(rs.next());
-        
-        // Upsert new row with null leading index column
-        stmt.setString(1,"b");
-        stmt.setString(2, null);
-        stmt.setString(3, "3");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals(null,rs.getString(1));
-        assertEquals("3",rs.getString(2));
-        assertEquals("b",rs.getString(3));
-        assertTrue(rs.next());
-        assertEquals("y",rs.getString(1));
-        assertNull(rs.getString(2));
-        assertEquals("a",rs.getString(3));
-        assertFalse(rs.next());
-
-        // Update row with null leading index column to have a value
-        stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " VALUES(?,?)");
-        stmt.setString(1,"b");
-        stmt.setString(2, "z");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("y",rs.getString(1));
-        assertNull(rs.getString(2));
-        assertEquals("a",rs.getString(3));
-        assertTrue(rs.next());
-        assertEquals("z",rs.getString(1));
-        assertEquals("3",rs.getString(2));
-        assertEquals("b",rs.getString(3));
-        assertFalse(rs.next());
-
-    }
- 
-    /**
-     * There was a case where if there were multiple updates to a single row in the same batch, the
-     * index wouldn't be updated correctly as each element of the batch was evaluated with the state
-     * previous to the batch, rather than with the rest of the batch. This meant you could do a put
-     * and a delete on a row in the same batch and the index result would contain the current + put
-     * and current + delete, but not current + put + delete.
-     * @throws Exception on failure
-     */
-    @Test
-    public void testMultipleUpdatesToSingleRow() throws Exception {
-        String query;
-        ResultSet rs;
-    
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-    
-        // make sure that the tables are empty, but reachable
-        conn.createStatement().execute(
-          "CREATE TABLE " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME
-              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        if(localIndex) {
-            conn.createStatement().execute(
-                "CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v1, v2)");
-        } else {
-            conn.createStatement().execute(
-                "CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v1, v2)");
-        }
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-    
-        // load some data into the table
-        PreparedStatement stmt =
-            conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        conn.commit();
-        
-        // make sure the index is working as expected
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("x", rs.getString(1));
-        assertEquals("1", rs.getString(2));
-        assertEquals("a", rs.getString(3));
-        assertFalse(rs.next());
-      
-        // do multiple updates to the same row, in the same batch
-        stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + "(k, v1) VALUES(?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "y");
-        stmt.execute();
-        stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + "(k,v2) VALUES(?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, null);
-        stmt.execute();
-        conn.commit();
-    
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("y", rs.getString(1));
-        assertNull(rs.getString(2));
-        assertEquals("a", rs.getString(3));
-        assertFalse(rs.next());
-    
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        if(localIndex) {
-            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME+" [-32768]\n"
-                    + "    SERVER FILTER BY FIRST KEY ONLY\n"
-                    + "CLIENT MERGE SORT",
-                QueryUtil.getExplainPlan(rs));
-        } else {
-            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME + "\n"
-                    + "    SERVER FILTER BY FIRST KEY ONLY",
-                QueryUtil.getExplainPlan(rs));
-        }
-    
-        // check that the data table matches as expected
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("a", rs.getString(1));
-        assertEquals("y", rs.getString(2));
-        assertNull(rs.getString(3));
-        assertFalse(rs.next());
-    }
-
-    @Test
-    public void testUpsertAfterIndexDrop() throws Exception {
-        String query;
-        ResultSet rs;
-    
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-    
-        // make sure that the tables are empty, but reachable
-        conn.createStatement().execute(
-          "CREATE TABLE " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME
-              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-    
-        conn.createStatement().execute(
-          "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v1, v2)");
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-    
-        // load some data into the table
-        PreparedStatement stmt =
-            conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        conn.commit();
-        
-        // make sure the index is working as expected
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("x", rs.getString(1));
-        assertEquals("1", rs.getString(2));
-        assertEquals("a", rs.getString(3));
-        assertFalse(rs.next());
-
-        String ddl = "DROP INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        stmt = conn.prepareStatement(ddl);
-        stmt.execute();
-        
-        stmt = conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + "(k, v1) VALUES(?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "y");
-        stmt.execute();
-        conn.commit();
-    
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-    
-        // check that the data table matches as expected
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("a", rs.getString(1));
-        assertEquals("y", rs.getString(2));
-        assertFalse(rs.next());
-    }
-    
-    @Test
-    public void testMultipleUpdatesAcrossRegions() throws Exception {
-        String query;
-        ResultSet rs;
-    
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-    
-        // make sure that the tables are empty, but reachable
-        conn.createStatement().execute(
-          "CREATE TABLE " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME
-              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + HTableDescriptor.MAX_FILESIZE + "=1, " + HTableDescriptor.MEMSTORE_FLUSHSIZE + "=1 " +
-                  "SPLIT ON ('b')");
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        if(localIndex) {
-            conn.createStatement().execute(
-                "CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v1, v2)");
-        } else {
-            conn.createStatement().execute(
-                "CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (v1, v2)");
-        }
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-    
-        // load some data into the table
-        PreparedStatement stmt =
-            conn.prepareStatement("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        stmt.setString(1, "b");
-        stmt.setString(2, "y");
-        stmt.setString(3, "2");
-        stmt.execute();
-        stmt.setString(1, "c");
-        stmt.setString(2, "z");
-        stmt.setString(3, "3");
-        stmt.execute();
-        conn.commit();
-        
-        // make sure the index is working as expected
-        query = "SELECT * FROM " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("x", rs.getString(1));
-        assertEquals("1", rs.getString(2));
-        assertEquals("a", rs.getString(3));
-        assertTrue(rs.next());
-        assertEquals("y", rs.getString(1));
-        assertEquals("2", rs.getString(2));
-        assertEquals("b", rs.getString(3));
-        assertTrue(rs.next());
-        assertEquals("z", rs.getString(1));
-        assertEquals("3", rs.getString(2));
-        assertEquals("c", rs.getString(3));
-        assertFalse(rs.next());
-
-        query = "SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        if (localIndex) {
-            assertEquals("CLIENT PARALLEL 2-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME+" [-32768]\n"
-                       + "    SERVER FILTER BY FIRST KEY ONLY\n"
-                       + "CLIENT MERGE SORT",
-                QueryUtil.getExplainPlan(rs));
-        } else {
-            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME + "\n"
-                    + "    SERVER FILTER BY FIRST KEY ONLY",
-                QueryUtil.getExplainPlan(rs));
-        }
-    
-        // check that the data table matches as expected
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("a", rs.getString(1));
-        assertEquals("x", rs.getString(2));
-        assertEquals("1", rs.getString(3));
-        assertTrue(rs.next());
-        assertEquals("b", rs.getString(1));
-        assertEquals("y", rs.getString(2));
-        assertEquals("2", rs.getString(3));
-        assertTrue(rs.next());
-        assertEquals("c", rs.getString(1));
-        assertEquals("z", rs.getString(2));
-        assertEquals("3", rs.getString(3));
-        assertFalse(rs.next());
-    }
-    
-    @Test
-    public void testIndexWithCaseSensitiveCols() throws Exception {
-        String query;
-        ResultSet rs;
-        
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            conn.createStatement().execute("CREATE TABLE cs (k VARCHAR NOT NULL PRIMARY KEY, \"V1\" VARCHAR, \"v2\" VARCHAR)");
-            query = "SELECT * FROM cs";
-            rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
-            if (localIndex) {
-                conn.createStatement().execute("CREATE LOCAL INDEX ics ON cs (\"v2\") INCLUDE (\"V1\")");
-            } else {
-                conn.createStatement().execute("CREATE INDEX ics ON cs (\"v2\") INCLUDE (\"V1\")");
-            }
-            query = "SELECT * FROM ics";
-            rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
-
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO cs VALUES(?,?,?)");
-            stmt.setString(1,"a");
-            stmt.setString(2, "x");
-            stmt.setString(3, "1");
-            stmt.execute();
-            stmt.setString(1,"b");
-            stmt.setString(2, "y");
-            stmt.setString(3, "2");
-            stmt.execute();
-            conn.commit();
-
-            query = "SELECT * FROM cs WHERE \"v2\" = '1'";
-            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-            if(localIndex){
-                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_CS [-32768,'1']\n"
-                           + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-            } else {
-                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER ICS ['1']", QueryUtil.getExplainPlan(rs));
-            }
-
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("a",rs.getString(1));
-            assertEquals("x",rs.getString(2));
-            assertEquals("1",rs.getString(3));
-            assertEquals("a",rs.getString("k"));
-            assertEquals("x",rs.getString("V1"));
-            assertEquals("1",rs.getString("v2"));
-            assertFalse(rs.next());
-
-            query = "SELECT \"V1\", \"V1\" as foo1, \"v2\" as foo, \"v2\" as \"Foo1\", \"v2\" FROM cs ORDER BY foo";
-            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-            if(localIndex){
-                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_CS [-32768]\nCLIENT MERGE SORT",
-                    QueryUtil.getExplainPlan(rs));
-            } else {
-                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER ICS", QueryUtil.getExplainPlan(rs));
-            }
-
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("x",rs.getString(1));
-            assertEquals("x",rs.getString("V1"));
-            assertEquals("x",rs.getString(2));
-            assertEquals("x",rs.getString("foo1"));
-            assertEquals("1",rs.getString(3));
-            assertEquals("1",rs.getString("Foo"));
-            assertEquals("1",rs.getString(4));
-            assertEquals("1",rs.getString("Foo1"));
-            assertEquals("1",rs.getString(5));
-            assertEquals("1",rs.getString("v2"));
-            assertTrue(rs.next());
-            assertEquals("y",rs.getString(1));
-            assertEquals("y",rs.getString("V1"));
-            assertEquals("y",rs.getString(2));
-            assertEquals("y",rs.getString("foo1"));
-            assertEquals("2",rs.getString(3));
-            assertEquals("2",rs.getString("Foo"));
-            assertEquals("2",rs.getString(4));
-            assertEquals("2",rs.getString("Foo1"));
-            assertEquals("2",rs.getString(5));
-            assertEquals("2",rs.getString("v2"));
-            assertFalse(rs.next());
-        } finally {
-            conn.close();
-        }
-    }
-
-    @Test
-    public void testInFilterOnIndexedTable() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-	        String ddl = "CREATE TABLE TEST (PK1 CHAR(2) NOT NULL PRIMARY KEY, CF1.COL1 BIGINT)";
-	        conn.createStatement().execute(ddl);
-	        if(localIndex) {
-	            ddl = "CREATE LOCAL INDEX IDX1 ON TEST (COL1)";
-	        } else {
-	            ddl = "CREATE INDEX IDX1 ON TEST (COL1)";
-	        }
-	        conn.createStatement().execute(ddl);
-	
-	        String query = "SELECT COUNT(COL1) FROM TEST WHERE COL1 IN (1,25,50,75,100)"; 
-	        ResultSet rs = conn.createStatement().executeQuery(query);
-	        assertTrue(rs.next());
-        } finally {
-            conn.close();
-        }
-    }
-
-    @Test
-    public void testIndexWithDecimalCol() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            Date date = new Date(System.currentTimeMillis());
-            
-            createMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME);
-            populateMultiCFTestTable(TestUtil.DEFAULT_DATA_TABLE_FULL_NAME, date);
-            String ddl = null;
-            if (localIndex) {
-                ddl = "CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (decimal_pk) INCLUDE (decimal_col1, decimal_col2)";
-            } else {
-                ddl = "CREATE INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " (decimal_pk) INCLUDE (decimal_col1, decimal_col2)";
-            }
-            PreparedStatement stmt = conn.prepareStatement(ddl);
-            stmt.execute();
-            
-            String query = "SELECT decimal_pk, decimal_col1, decimal_col2 from " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME ;
-            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-            if(localIndex) {
-                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-            } else {
-                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + TestUtil.DEFAULT_INDEX_TABLE_FULL_NAME, QueryUtil.getExplainPlan(rs));
-            }
-            
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals(new BigDecimal("1.1"), rs.getBigDecimal(1));
-            assertEquals(new BigDecimal("2.1"), rs.getBigDecimal(2));
-            assertEquals(new BigDecimal("3.1"), rs.getBigDecimal(3));
-            assertTrue(rs.next());
-            assertEquals(new BigDecimal("2.2"), rs.getBigDecimal(1));
-            assertEquals(new BigDecimal("3.2"), rs.getBigDecimal(2));
-            assertEquals(new BigDecimal("4.2"), rs.getBigDecimal(3));
-            assertTrue(rs.next());
-            assertEquals(new BigDecimal("3.3"), rs.getBigDecimal(1));
-            assertEquals(new BigDecimal("4.3"), rs.getBigDecimal(2));
-            assertEquals(new BigDecimal("5.3"), rs.getBigDecimal(3));
-            assertFalse(rs.next());
-        } finally {
-            conn.close();
-        }
-    }
-    
-
-    @Test
-    public void testUpsertingNullForIndexedColumns() throws Exception {
-    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-    	Connection conn = DriverManager.getConnection(getUrl(), props);
-    	conn.setAutoCommit(false);
-    	try {
-    		Statement stmt = conn.createStatement();
-    		stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 DOUBLE, v3 VARCHAR)");
-    		stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)");
-    		
-    		//create a row with value null for indexed column v2
-    		stmt.executeUpdate("upsert into DEMO values('cc1', null, 'abc')");
-    		conn.commit();
-    		
-    		//assert values in index table 
-    		ResultSet rs = stmt.executeQuery("select * from DEMO_IDX");
-    		assertTrue(rs.next());
-    		assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
-    		assertTrue(rs.wasNull());
-    		assertEquals("cc1", rs.getString(2));
-    		assertEquals("abc", rs.getString(3));
-    		assertFalse(rs.next());
-    		
-    		//assert values in data table
-    		rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-    		assertTrue(rs.next());
-    		assertEquals("cc1", rs.getString(1));
-    		assertEquals(0, Doubles.compare(0, rs.getDouble(2)));
-    		assertTrue(rs.wasNull());
-    		assertEquals("abc", rs.getString(3));
-    		assertFalse(rs.next());
-    		
-    		//update the previously null value for indexed column v2 to a non-null value 1.23
-    		stmt.executeUpdate("upsert into DEMO values('cc1', 1.23, 'abc')");
-    		conn.commit();
-    		
-    		//assert values in data table
-    		rs = stmt.executeQuery("select /*+ NO_INDEX */ v1, v2, v3 from DEMO");
-    		assertTrue(rs.next());
-    		assertEquals("cc1", rs.getString(1));
-    		assertEquals(0, Doubles.compare(1.23, rs.getDouble(2)));
-    		assertEquals("abc", rs.getString(3));
-    		assertFalse(rs.next());
-    		
-    		//assert values in index table 
-    		rs = stmt.executeQuery("select * from DEMO_IDX");
-    		assertTrue(rs.next());
-    		assertEquals(0, Doubles.compare(1.23, rs.getDouble(1)));
-    		assertEquals("cc1", rs.getString(2));
-    		assertEquals("abc", rs.getString(3));
-    		assertFalse(rs.next());
-    		
-    		//update the value for indexed column v2 back to null
-    		stmt.executeUpdate("upsert into DEMO values('cc1', null, 'abc')");
-    		conn.commit();
-    		
-    		//assert values in index table 
-    		rs = stmt.executeQuery("select * from DEMO_IDX");
-    		assertTrue(rs.next());
-    		assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
-    		assertTrue(rs.wasNull());
-    		assertEquals("cc1", rs.getString(2));
-    		assertEquals("abc", rs.getString(3));
-    		assertFalse(rs.next());
-    		
-    		//assert values in data table
-    		rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-    		assertTrue(rs.next());
-    		assertEquals("cc1", rs.getString(1));
-    		assertEquals(0, Doubles.compare(0, rs.getDouble(2)));
-    		assertEquals("abc", rs.getString(3));
-    		assertFalse(rs.next());
-    	} finally {
-    		conn.close();
-    	}
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableIndexIT.java
deleted file mode 100644
index 5bacfea..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableIndexIT.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-public class GlobalMutableIndexIT extends BaseMutableIndexIT {
-
-    public GlobalMutableIndexIT() {
-        super(false);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index ddcc0e8..a589408 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -17,428 +17,92 @@
  */
 package org.apache.phoenix.end2end.index;
 
-import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
-import static org.apache.phoenix.util.TestUtil.INDEX_DATA_TABLE;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.math.BigDecimal;
 import java.sql.Connection;
-import java.sql.Date;
 import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 
+@RunWith(Parameterized.class)
 public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
-    // Populate the test table with data.
-    private static void populateTestTable() throws SQLException {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        try {
-            String upsert = "UPSERT INTO " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
-                    + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-            PreparedStatement stmt = conn.prepareStatement(upsert);
-            stmt.setString(1, "varchar1");
-            stmt.setString(2, "char1");
-            stmt.setInt(3, 1);
-            stmt.setLong(4, 1L);
-            stmt.setBigDecimal(5, new BigDecimal(1.0));
-            Date date = DateUtil.parseDate("2015-01-01 00:00:00");
-            stmt.setDate(6, date);
-            stmt.setString(7, "varchar_a");
-            stmt.setString(8, "chara");
-            stmt.setInt(9, 2);
-            stmt.setLong(10, 2L);
-            stmt.setBigDecimal(11, new BigDecimal(2.0));
-            stmt.setDate(12, date);
-            stmt.setString(13, "varchar_b");
-            stmt.setString(14, "charb");
-            stmt.setInt(15, 3);
-            stmt.setLong(16, 3L);
-            stmt.setBigDecimal(17, new BigDecimal(3.0));
-            stmt.setDate(18, date);
-            stmt.executeUpdate();
-            
-            stmt.setString(1, "varchar2");
-            stmt.setString(2, "char2");
-            stmt.setInt(3, 2);
-            stmt.setLong(4, 2L);
-            stmt.setBigDecimal(5, new BigDecimal(2.0));
-            date = DateUtil.parseDate("2015-01-02 00:00:00");
-            stmt.setDate(6, date);
-            stmt.setString(7, "varchar_a");
-            stmt.setString(8, "chara");
-            stmt.setInt(9, 3);
-            stmt.setLong(10, 3L);
-            stmt.setBigDecimal(11, new BigDecimal(3.0));
-            stmt.setDate(12, date);
-            stmt.setString(13, "varchar_b");
-            stmt.setString(14, "charb");
-            stmt.setInt(15, 4);
-            stmt.setLong(16, 4L);
-            stmt.setBigDecimal(17, new BigDecimal(4.0));
-            stmt.setDate(18, date);
-            stmt.executeUpdate();
-            
-            stmt.setString(1, "varchar3");
-            stmt.setString(2, "char3");
-            stmt.setInt(3, 3);
-            stmt.setLong(4, 3L);
-            stmt.setBigDecimal(5, new BigDecimal(3.0));
-            date = DateUtil.parseDate("2015-01-03 00:00:00");
-            stmt.setDate(6, date);
-            stmt.setString(7, "varchar_a");
-            stmt.setString(8, "chara");
-            stmt.setInt(9, 4);
-            stmt.setLong(10, 4L);
-            stmt.setBigDecimal(11, new BigDecimal(4.0));
-            stmt.setDate(12, date);
-            stmt.setString(13, "varchar_b");
-            stmt.setString(14, "charb");
-            stmt.setInt(15, 5);
-            stmt.setLong(16, 5L);
-            stmt.setBigDecimal(17, new BigDecimal(5.0));
-            stmt.setDate(18, date);
-            stmt.executeUpdate();
-            
-            conn.commit();
-        } finally {
-            conn.close();
-        }
+	
+	private final boolean localIndex;
+	private final String tableDDLOptions;
+	
+	public ImmutableIndexIT(boolean localIndex, boolean transactional) {
+		this.localIndex = localIndex;
+		StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
+		if (transactional) {
+			optionBuilder.append(", TRANSACTIONAL=true");
+		}
+		this.tableDDLOptions = optionBuilder.toString();
+	}
+	
+	@Parameters(name="localIndex = {0} , transactional = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false }, { false, true }, { true, false }, { true, true }
+           });
     }
    
     @Test
-    public void testIndexWithNullableFixedWithCols() throws Exception {
-        testIndexWithNullableFixedWithCols(false);
-    }
-
-    @Test
-    public void testLocalIndexWithNullableFixedWithCols() throws Exception {
-        testIndexWithNullableFixedWithCols(true);
-    }
-
-    public void testIndexWithNullableFixedWithCols(boolean localIndex) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
-        populateTestTable();
-        String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
-                    + " (char_col1 ASC, int_col1 ASC)"
-                    + " INCLUDE (long_col1, long_col2)";
-        PreparedStatement stmt = conn.prepareStatement(ddl);
-        stmt.execute();
-        
-        String query = "SELECT char_col1, int_col1 from " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE;
-        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        if(localIndex) {
-            assertEquals(
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_INDEX_TEST.INDEX_DATA_TABLE [-32768]\n" + 
-                "    SERVER FILTER BY FIRST KEY ONLY\n" +
-                "CLIENT MERGE SORT",
-                QueryUtil.getExplainPlan(rs));
-        } else {
-            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER INDEX_TEST.IDX\n"
-                    + "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
-        }
-        
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals("chara", rs.getString(1));
-        assertEquals(2, rs.getInt(2));
-        assertTrue(rs.next());
-        assertEquals("chara", rs.getString(1));
-        assertEquals(3, rs.getInt(2));
-        assertTrue(rs.next());
-        assertEquals("chara", rs.getString(1));
-        assertEquals(4, rs.getInt(2));
-        assertFalse(rs.next());
-        
-        conn.createStatement().execute("DROP INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE);
-        
-        query = "SELECT char_col1, int_col1 from " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        
-        query = "SELECT char_col1, int_col1 from IDX ";
-        try{
-            rs = conn.createStatement().executeQuery(query);
-            fail();
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
-        }
-        
-        
-    }
-    
-    private void assertImmutableRows(Connection conn, String fullTableName, boolean expectedValue) throws SQLException {
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows());
-    }
-    
-    @Test
-    public void testAlterTableWithImmutability() throws Exception {
-
-        String query;
-        ResultSet rs;
-        String fullTableName = "T";
-
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-
-        conn.createStatement().execute(
-            "CREATE TABLE t (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR)  ");
-        
-        query = "SELECT * FROM t";
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        assertImmutableRows(conn,fullTableName, false);
-        conn.createStatement().execute("ALTER TABLE t SET IMMUTABLE_ROWS=true");
-        assertImmutableRows(conn,fullTableName, true);
-        
-        
-        conn.createStatement().execute("ALTER TABLE t SET immutable_rows=false");
-        assertImmutableRows(conn,fullTableName, false);
-    }
-    
-    @Test
-    public void testDeleteFromAllPKColumnIndex() throws Exception {
-        testDeleteFromAllPKColumnIndex(false);
-    }
-
-    @Test
-    public void testDeleteFromAllPKColumnLocalIndex() throws Exception {
-        testDeleteFromAllPKColumnIndex(true);
-    }
-
-    public void testDeleteFromAllPKColumnIndex(boolean localIndex) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
-        populateTestTable();
-        String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
-                    + " (long_pk, varchar_pk)"
-                    + " INCLUDE (long_col1, long_col2)";
-        PreparedStatement stmt = conn.prepareStatement(ddl);
-        stmt.execute();
-        
-        ResultSet rs;
-        
-        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " +INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE);
-        assertTrue(rs.next());
-        assertEquals(3,rs.getInt(1));
-        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " +INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + "IDX");
-        assertTrue(rs.next());
-        assertEquals(3,rs.getInt(1));
-        
-        String dml = "DELETE from " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE +
-                " WHERE long_col2 = 4";
-        assertEquals(1,conn.createStatement().executeUpdate(dml));
-        conn.commit();
-        
-        String query = "SELECT /*+ NO_INDEX */ long_pk FROM " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals(1L, rs.getLong(1));
-        assertTrue(rs.next());
-        assertEquals(3L, rs.getLong(1));
-        assertFalse(rs.next());
-        
-        query = "SELECT long_pk FROM " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals(1L, rs.getLong(1));
-        assertTrue(rs.next());
-        assertEquals(3L, rs.getLong(1));
-        assertFalse(rs.next());
-        
-        query = "SELECT * FROM " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + "IDX" ;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        assertEquals(1L, rs.getLong(1));
-        assertTrue(rs.next());
-        assertEquals(3L, rs.getLong(1));
-        assertFalse(rs.next());
-        
-        conn.createStatement().execute("DROP INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE);
-    }
-    
-    @Test
     public void testDropIfImmutableKeyValueColumn() throws Exception {
-        testDropIfImmutableKeyValueColumn(false);
-    }
-
-    @Test
-    public void testDropIfImmutableKeyValueColumnWithLocalIndex() throws Exception {
-        testDropIfImmutableKeyValueColumn(true);
-    }
-
-    public void testDropIfImmutableKeyValueColumn(boolean localIndex) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
-        populateTestTable();
-        String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
-                    + " (long_col1)";
-        PreparedStatement stmt = conn.prepareStatement(ddl);
-        stmt.execute();
-        
-        ResultSet rs;
-        
-        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " +INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE);
-        assertTrue(rs.next());
-        assertEquals(3,rs.getInt(1));
-        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " +INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + "IDX");
-        assertTrue(rs.next());
-        assertEquals(3,rs.getInt(1));
-        
-        conn.setAutoCommit(true);
-        String dml = "DELETE from " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE +
-                " WHERE long_col2 = 4";
-        try {
-            conn.createStatement().execute(dml);
-            fail();
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode());
-        }
-            
-        conn.createStatement().execute("DROP TABLE " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE);
-    }
-    
-    @Test
-    public void testGroupByCount() throws Exception {
-        testGroupByCount(false);
-    }
-
-    @Test
-    public void testGroupByCountWithLocalIndex() throws Exception {
-        testGroupByCount(true);
-    }
-
-    public void testGroupByCount(boolean localIndex) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
-        populateTestTable();
-        String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
-                + " (int_col2)";
-        PreparedStatement stmt = conn.prepareStatement(ddl);
-        stmt.execute();
-        
-        ResultSet rs;
-        
-        rs = conn.createStatement().executeQuery("SELECT int_col2, COUNT(*) FROM " +INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE + " GROUP BY int_col2");
-        assertTrue(rs.next());
-        assertEquals(1,rs.getInt(2));
-    }
-    
-    @Test   
-    public void testSelectDistinctOnTableWithSecondaryImmutableIndex() throws Exception {
-        testSelectDistinctOnTableWithSecondaryImmutableIndex(false);
-    }
-
-    @Test   
-    public void testSelectDistinctOnTableWithSecondaryImmutableLocalIndex() throws Exception {
-        testSelectDistinctOnTableWithSecondaryImmutableIndex(true);
-    }
-
-    public void testSelectDistinctOnTableWithSecondaryImmutableIndex(boolean localIndex) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
-        populateTestTable();
-        String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
-                + " (int_col2)";
-        Connection conn = null;
-        PreparedStatement stmt = null;
-
-        try {
-            try {
-                conn = DriverManager.getConnection(getUrl(), props);
-                conn.setAutoCommit(false);
-                stmt = conn.prepareStatement(ddl);
-                stmt.execute();
-                ResultSet rs = conn.createStatement().executeQuery("SELECT distinct int_col2 FROM " +INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE + " where int_col2 > 0");
-                assertTrue(rs.next());
-                assertEquals(3, rs.getInt(1));
-                assertTrue(rs.next());
-                assertEquals(4, rs.getInt(1));
-                assertTrue(rs.next());
-                assertEquals(5, rs.getInt(1));
-                assertFalse(rs.next());
-            } finally {
-                if (stmt != null) {
-                    stmt.close();
-                }
-            } 
-        } finally {
-            if (conn != null) {
-                conn.close();
-            }
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+	        Statement stmt = conn.createStatement();
+	        stmt.execute(ddl);
+	        populateTestTable(fullTableName);
+	        ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + " (long_col1)";
+	        stmt.execute(ddl);
+	        
+	        ResultSet rs;
+	        
+	        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName);
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getInt(1));
+	        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getInt(1));
+	        
+	        conn.setAutoCommit(true);
+	        String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
+	        try {
+	            conn.createStatement().execute(dml);
+	            fail();
+	        } catch (SQLException e) {
+	            assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode());
+	        }
+	            
+	        conn.createStatement().execute("DROP TABLE " + fullTableName);
         }
     }
     
-    @Test
-    public void testInClauseWithIndexOnColumnOfUsignedIntType() throws Exception {
-        testInClauseWithIndexOnColumnOfUsignedIntType(false);
-    }
 
-    @Test
-    public void testInClauseWithLocalIndexOnColumnOfUsignedIntType() throws Exception {
-        testInClauseWithIndexOnColumnOfUsignedIntType(true);
-    }
-
-    public void testInClauseWithIndexOnColumnOfUsignedIntType(boolean localIndex) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = null;
-        PreparedStatement stmt = null;
-        ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
-        populateTestTable();
-        String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
-                + " (int_col1)";
-        try {
-            try {
-                conn = DriverManager.getConnection(getUrl(), props);
-                conn.setAutoCommit(false);
-                stmt = conn.prepareStatement(ddl);
-                stmt.execute();
-                ResultSet rs = conn.createStatement().executeQuery("SELECT int_col1 FROM " +INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE + " where int_col1 IN (1, 2, 3, 4)");
-                assertTrue(rs.next());
-                assertEquals(2, rs.getInt(1));
-                assertTrue(rs.next());
-                assertEquals(3, rs.getInt(1));
-                assertTrue(rs.next());
-                assertEquals(4, rs.getInt(1));
-                assertFalse(rs.next());
-            } finally {
-                if(stmt != null) {
-                    stmt.close();
-                }
-            } 
-        } finally {
-            if(conn != null) {
-                conn.close();
-            }
-        }
-    }
 }


[2/4] phoenix git commit: Increase testing around transaction integration PHOENIX-1900

Posted by td...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
new file mode 100644
index 0000000..127c988
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.primitives.Doubles;
+
+@RunWith(Parameterized.class)
+public class MutableIndexIT extends BaseHBaseManagedTimeIT {
+    
+    protected final boolean localIndex;
+    private final String tableDDLOptions;
+	
+    public MutableIndexIT(boolean localIndex, boolean transactional) {
+		this.localIndex = localIndex;
+		StringBuilder optionBuilder = new StringBuilder();
+		if (transactional) {
+			optionBuilder.append("TRANSACTIONAL=true");
+		}
+		this.tableDDLOptions = optionBuilder.toString();
+	}
+	
+	@Parameters(name="localIndex = {0} , transactional = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false }, { false, true }, { true, false }, { true, true }
+           });
+    }
+    
+    @Test
+    public void testCoveredColumnUpdates() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        
+            createMultiCFTestTable(fullTableName, tableDDLOptions);
+            populateMultiCFTestTable(fullTableName);
+            PreparedStatement stmt = conn.prepareStatement("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName 
+            		+ " (char_col1 ASC, int_col1 ASC) INCLUDE (long_col1, long_col2)");
+            stmt.execute();
+            
+            String query = "SELECT char_col1, int_col1, long_col2 from " + fullTableName;
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if (localIndex) {
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName +" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+            }
+            
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            assertEquals(3L, rs.getLong(3));
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
+            assertEquals(4L, rs.getLong(3));
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
+            assertEquals(5L, rs.getLong(3));
+            assertFalse(rs.next());
+            
+            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName
+                    + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2*2 FROM "
+                    + fullTableName + " WHERE long_col2=?");
+            stmt.setLong(1,4L);
+            assertEquals(1,stmt.executeUpdate());
+            conn.commit();
+
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            assertEquals(3L, rs.getLong(3));
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
+            assertEquals(8L, rs.getLong(3));
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
+            assertEquals(5L, rs.getLong(3));
+            assertFalse(rs.next());
+            
+            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName
+                    + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, null FROM "
+                    + fullTableName + " WHERE long_col2=?");
+            stmt.setLong(1,3L);
+            assertEquals(1,stmt.executeUpdate());
+            conn.commit();
+            
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            assertEquals(0, rs.getLong(3));
+            assertTrue(rs.wasNull());
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
+            assertEquals(8L, rs.getLong(3));
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
+            assertEquals(5L, rs.getLong(3));
+            assertFalse(rs.next());
+            if(localIndex) {
+                query = "SELECT b.* from " + fullTableName + " where int_col1 = 4";
+                rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName +" [-32768]\n" +
+                		"    SERVER FILTER BY TO_INTEGER(\"INT_COL1\") = 4\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+                rs = conn.createStatement().executeQuery(query);
+                assertTrue(rs.next());
+                assertEquals("varchar_b", rs.getString(1));
+                assertEquals("charb", rs.getString(2));
+                assertEquals(5, rs.getInt(3));
+                assertEquals(5, rs.getLong(4));
+                assertFalse(rs.next());
+                
+            }
+        } 
+    }
+    
+    @Test
+    public void testCoveredColumns() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	        
+	        conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x",rs.getString(1));
+	        assertEquals("a",rs.getString(2));
+	        assertEquals("1",rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2, null);
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x",rs.getString(1));
+	        assertEquals("a",rs.getString(2));
+	        assertNull(rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));            
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+	        }
+	
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a",rs.getString(1));
+	        assertEquals("x",rs.getString(2));
+	        assertNull(rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2,"3");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName + " [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));            
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+	        }
+	        
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a",rs.getString(1));
+	        assertEquals("x",rs.getString(2));
+	        assertEquals("3",rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2,"4");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));            
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+	        }
+	        
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a",rs.getString(1));
+	        assertEquals("x",rs.getString(2));
+	        assertEquals("4",rs.getString(3));
+	        assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testCompoundIndexKey() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	
+	        // make sure that the tables are empty, but reachable
+	        conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	        conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        // load some data into the table
+	        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x",rs.getString(1));
+	        assertEquals("1",rs.getString(2));
+	        assertEquals("a",rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2, "y");
+	        stmt.setString(3, null);
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("y",rs.getString(1));
+	        assertNull(rs.getString(2));
+	        assertEquals("a",rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if (localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\n"
+	                    + "    SERVER FILTER BY FIRST KEY ONLY\n"
+	                    + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n"
+	                       + "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
+	        }
+	        //make sure the data table looks like what we expect
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a",rs.getString(1));
+	        assertEquals("y",rs.getString(2));
+	        assertNull(rs.getString(3));
+	        assertFalse(rs.next());
+	        
+	        // Upsert new row with null leading index column
+	        stmt.setString(1,"b");
+	        stmt.setString(2, null);
+	        stmt.setString(3, "3");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals(null,rs.getString(1));
+	        assertEquals("3",rs.getString(2));
+	        assertEquals("b",rs.getString(3));
+	        assertTrue(rs.next());
+	        assertEquals("y",rs.getString(1));
+	        assertNull(rs.getString(2));
+	        assertEquals("a",rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        // Update row with null leading index column to have a value
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?)");
+	        stmt.setString(1,"b");
+	        stmt.setString(2, "z");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("y",rs.getString(1));
+	        assertNull(rs.getString(2));
+	        assertEquals("a",rs.getString(3));
+	        assertTrue(rs.next());
+	        assertEquals("z",rs.getString(1));
+	        assertEquals("3",rs.getString(2));
+	        assertEquals("b",rs.getString(3));
+	        assertFalse(rs.next());
+        }
+
+    }
+    
+    /**
+     * There was a case where if there were multiple updates to a single row in the same batch, the
+     * index wouldn't be updated correctly as each element of the batch was evaluated with the state
+     * previous to the batch, rather than with the rest of the batch. This meant you could do a put
+     * and a delete on a row in the same batch and the index result would contain the current + put
+     * and current + delete, but not current + put + delete.
+     * @throws Exception on failure
+     */
+    @Test
+    public void testMultipleUpdatesToSingleRow() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+    
+	        // make sure that the tables are empty, but reachable
+	        conn.createStatement().execute(
+	          "CREATE TABLE " + fullTableName
+	              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	    
+	        // load some data into the table
+	        PreparedStatement stmt =
+	            conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        // make sure the index is working as expected
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x", rs.getString(1));
+	        assertEquals("1", rs.getString(2));
+	        assertEquals("a", rs.getString(3));
+	        assertFalse(rs.next());
+	      
+	        // do multiple updates to the same row, in the same batch
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k, v1) VALUES(?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, "y");
+	        stmt.execute();
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, null);
+	        stmt.execute();
+	        conn.commit();
+	    
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("y", rs.getString(1));
+	        assertNull(rs.getString(2));
+	        assertEquals("a", rs.getString(3));
+	        assertFalse(rs.next());
+	    
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\n"
+	                    + "    SERVER FILTER BY FIRST KEY ONLY\n"
+	                    + "CLIENT MERGE SORT",
+	                QueryUtil.getExplainPlan(rs));
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n"
+	                    + "    SERVER FILTER BY FIRST KEY ONLY",
+	                QueryUtil.getExplainPlan(rs));
+	        }
+	    
+	        // check that the data table matches as expected
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a", rs.getString(1));
+	        assertEquals("y", rs.getString(2));
+	        assertNull(rs.getString(3));
+	        assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testUpsertingNullForIndexedColumns() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexeName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+    		Statement stmt = conn.createStatement();
+    		stmt.execute("CREATE TABLE " + fullTableName + "(v1 VARCHAR PRIMARY KEY, v2 DOUBLE, v3 VARCHAR) "+tableDDLOptions);
+    		stmt.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + "  (v2) INCLUDE(v3)");
+    		
+    		//create a row with value null for indexed column v2
+    		stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', null, 'abc')");
+    		conn.commit();
+    		
+    		//assert values in index table 
+    		rs = stmt.executeQuery("select * from " + fullIndexeName);
+    		assertTrue(rs.next());
+    		assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
+    		assertTrue(rs.wasNull());
+    		assertEquals("cc1", rs.getString(2));
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    		
+    		//assert values in data table
+    		rs = stmt.executeQuery("select v1, v2, v3 from " + fullTableName);
+    		assertTrue(rs.next());
+    		assertEquals("cc1", rs.getString(1));
+    		assertEquals(0, Doubles.compare(0, rs.getDouble(2)));
+    		assertTrue(rs.wasNull());
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    		
+    		//update the previously null value for indexed column v2 to a non-null value 1.23
+    		stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', 1.23, 'abc')");
+    		conn.commit();
+    		
+    		//assert values in data table
+    		rs = stmt.executeQuery("select /*+ NO_INDEX */ v1, v2, v3 from " + fullTableName);
+    		assertTrue(rs.next());
+    		assertEquals("cc1", rs.getString(1));
+    		assertEquals(0, Doubles.compare(1.23, rs.getDouble(2)));
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    		
+    		//assert values in index table 
+    		rs = stmt.executeQuery("select * from " + fullIndexeName);
+    		assertTrue(rs.next());
+    		assertEquals(0, Doubles.compare(1.23, rs.getDouble(1)));
+    		assertEquals("cc1", rs.getString(2));
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    		
+    		//update the value for indexed column v2 back to null
+    		stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', null, 'abc')");
+    		conn.commit();
+    		
+    		//assert values in index table 
+    		rs = stmt.executeQuery("select * from " + fullIndexeName);
+    		assertTrue(rs.next());
+    		assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
+    		assertTrue(rs.wasNull());
+    		assertEquals("cc1", rs.getString(2));
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    		
+    		//assert values in data table
+    		rs = stmt.executeQuery("select v1, v2, v3 from " + fullTableName);
+    		assertTrue(rs.next());
+    		assertEquals("cc1", rs.getString(1));
+    		assertEquals(0, Doubles.compare(0, rs.getDouble(2)));
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    	} 
+    }
+    
+	
+    private void assertImmutableRows(Connection conn, String fullTableName, boolean expectedValue) throws SQLException {
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows());
+    }
+    
+    @Test
+    public void testAlterTableWithImmutability() throws Exception {
+        String query;
+        ResultSet rs;
+        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        conn.createStatement().execute(
+	            "CREATE TABLE " + fullTableName +" (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR) " + tableDDLOptions);
+	        
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        assertImmutableRows(conn,fullTableName, false);
+	        conn.createStatement().execute("ALTER TABLE " + fullTableName +" SET IMMUTABLE_ROWS=true");
+	        assertImmutableRows(conn,fullTableName, true);
+	        
+	        
+	        conn.createStatement().execute("ALTER TABLE " + fullTableName +" SET immutable_rows=false");
+	        assertImmutableRows(conn,fullTableName, false);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
deleted file mode 100644
index 5196b0a..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.phoenix.end2end.Shadower;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-
-public class TxGlobalMutableIndexIT extends GlobalMutableIndexIT {
-    
-    @BeforeClass
-    @Shadower(classBeingShadowed = BaseMutableIndexIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan
-        // Forces server cache to be used
-        props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
-        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
-        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
-    
-    @Test
-    public void testRollbackOfUncommittedIndexChange() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 VARCHAR, v3 VARCHAR)");
-            stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)");
-            
-            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
-            
-            //assert values in data table
-            ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertTrue(rs.next());
-            assertEquals("x", rs.getString(1));
-            assertEquals("y", rs.getString(2));
-            assertEquals("a", rs.getString(3));
-            assertFalse(rs.next());
-            
-            conn.rollback();
-            
-            //assert values in data table
-            rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertFalse(rs.next());
-            
-        } finally {
-            conn.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
deleted file mode 100644
index e9d685f..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.end2end.Shadower;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-
-public class TxImmutableIndexIT extends ImmutableIndexIT {
-    
-    @BeforeClass
-    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan
-        // Forces server cache to be used
-        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
-        // We need this b/c we don't allow a transactional table to be created if the underlying
-        // HBase table already exists (since we don't know if it was transactional before).
-        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
-    
-    // TODO: need test case with mix of mutable and immutable indexes
-    @Test
-    public void testRollbackOfUncommittedKeyValueIndexChange() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 VARCHAR, v3 VARCHAR) IMMUTABLE_ROWS=true");
-            stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)");
-            
-            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
-            
-            //assert values in data table
-            ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertTrue(rs.next());
-            assertEquals("x", rs.getString(1));
-            assertEquals("y", rs.getString(2));
-            assertEquals("a", rs.getString(3));
-            assertFalse(rs.next());
-            
-            conn.rollback();
-            
-            //assert values in data table
-            rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertFalse(rs.next());
-            
-        } finally {
-            conn.close();
-        }
-    }
-    
-    // TODO: need test case with mix of mutable and immutable indexes
-    @Test
-    public void testRollbackOfUncommittedRowKeyIndexChange() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE DEMO(v1 VARCHAR, v2 VARCHAR, v3 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2)) IMMUTABLE_ROWS=true");
-            stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2, v1)");
-            
-            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
-            
-            //assert values in data table
-            ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertTrue(rs.next());
-            assertEquals("x", rs.getString(1));
-            assertEquals("y", rs.getString(2));
-            assertEquals("a", rs.getString(3));
-            assertFalse(rs.next());
-            
-            conn.rollback();
-            
-            //assert values in data table
-            rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertFalse(rs.next());
-            
-        } finally {
-            conn.close();
-        }
-    }
-	
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
new file mode 100644
index 0000000..4131c2d
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
@@ -0,0 +1,260 @@
+package org.apache.phoenix.end2end.index.txn;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class MutableRollbackIT extends BaseHBaseManagedTimeIT {
+	
+	private final boolean localIndex;
+
+	public MutableRollbackIT(boolean localIndex) {
+		this.localIndex = localIndex;
+	}
+	
+	@BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        // We need this b/c we don't allow a transactional table to be created if the underlying
+        // HBase table already exists (since we don't know if it was transactional before).
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+	
+	@Parameters(name="localIndex = {0}")
+    public static Collection<Boolean> data() {
+        return Arrays.asList(new Boolean[] {     
+                 false, true  
+           });
+    }
+	
+	@Test
+    public void testRollbackOfUncommittedExistingKeyValueIndexUpdate() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO1(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            stmt.execute("CREATE TABLE DEMO2(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO1_idx ON DEMO1 (v1) INCLUDE(v2)");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO2_idx ON DEMO2 (v1) INCLUDE(v2)");
+            
+            stmt.executeUpdate("upsert into DEMO1 values('x', 'y', 'a')");
+            conn.commit();
+            
+            //assert rows exists in DEMO1
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO1_idx
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into DEMO1 values('x', 'y', 'b')");
+            stmt.executeUpdate("upsert into DEMO2 values('a', 'b', 'c')");
+            
+            //assert new covered column value 
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("b", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert new covered column value 
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("b", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertEquals("c", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO2 index table
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert original row exists in DEMO1
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert original row exists in DEMO1_idx
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+	@Test
+    public void testRollbackOfUncommittedExistingRowKeyIndexUpdate() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO1(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            stmt.execute("CREATE TABLE DEMO2(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO1_idx ON DEMO1 (v1, k)");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO2_idx ON DEMO2 (v1, k)");
+            
+            stmt.executeUpdate("upsert into DEMO1 values('x', 'y', 'a')");
+            conn.commit();
+            
+            //assert rows exists in DEMO1 
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO1_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into DEMO1 values('x', 'z', 'a')");
+            stmt.executeUpdate("upsert into DEMO2 values('a', 'b', 'c')");
+            
+            //assert new covered row key value exists in DEMO1
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("z", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert new covered row key value exists in DEMO1_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("z", rs.getString(2));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertEquals("c", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO2 index table
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert original row exists in DEMO1
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert original row exists in DEMO1_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
new file mode 100644
index 0000000..b7ec3c6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
@@ -0,0 +1,144 @@
+package org.apache.phoenix.end2end.index.txn;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class RollbackIT extends BaseHBaseManagedTimeIT {
+	
+	private final boolean localIndex;
+	private final boolean mutable;
+
+	public RollbackIT(boolean localIndex, boolean mutable) {
+		this.localIndex = localIndex;
+		this.mutable = mutable;
+	}
+	
+	@BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        // We need this b/c we don't allow a transactional table to be created if the underlying
+        // HBase table already exists (since we don't know if it was transactional before).
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+	
+	@Parameters(name="localIndex = {0} , mutable = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false }, { false, true }, { true, false }, { true, true }  
+           });
+    }
+    
+    @Test
+    public void testRollbackOfUncommittedKeyValueIndexInsert() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX DEMO_idx ON DEMO (v1) INCLUDE(v2)");
+            
+            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
+            
+            //assert values in data table
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1, v2  from DEMO ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert values in data table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k");
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1");
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testRollbackOfUncommittedRowKeyIndexInsert() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO(k VARCHAR, v1 VARCHAR, v2 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2))"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX DEMO_idx ON DEMO (v1, k)");
+            
+            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
+
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1");
+            
+            //assert values in data table
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1 from DEMO ORDER BY v2");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert values in data table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO");
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1 from DEMO ORDER BY v2");
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+}
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
new file mode 100644
index 0000000..205056b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
@@ -0,0 +1,198 @@
+package org.apache.phoenix.end2end.index.txn;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class TxWriteFailureIT extends BaseTest {
+	
+    private static PhoenixTestDriver driver;
+    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+    private static final String SCHEMA_NAME = "S";
+    private static final String DATA_TABLE_NAME = "T";
+    private static final String INDEX_TABLE_NAME = "I";
+    private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, DATA_TABLE_NAME);
+    private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, INDEX_TABLE_NAME);
+    private static final String ROW_TO_FAIL = "fail";
+    
+    private final boolean localIndex;
+	private final boolean mutable;
+
+	public TxWriteFailureIT(boolean localIndex, boolean mutable) {
+		this.localIndex = localIndex;
+		this.mutable = mutable;
+	}
+
+	@BeforeClass
+	public static void setupCluster() throws Exception {
+		Configuration conf = TEST_UTIL.getConfiguration();
+		setUpConfigForMiniCluster(conf);
+		conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class);
+		conf.setBoolean("hbase.coprocessor.abortonerror", false);
+		conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
+		TEST_UTIL.startMiniCluster();
+		String clientPort = TEST_UTIL.getConfiguration().get(
+				QueryServices.ZOOKEEPER_PORT_ATTRIB);
+		url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST
+				+ JDBC_PROTOCOL_SEPARATOR + clientPort
+				+ JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+
+		Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+		// Must update config before starting server
+		props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+		props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+		driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+		clusterInitialized = true;
+		setupTxManager();
+	}
+	
+	@Parameters(name="localIndex = {0} , mutable = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {
+                 { false, false }, { false, true }, { true, false }, { true, true }
+           });
+    }
+	
+	@Test
+    public void testIndexTableWriteFailure() throws Exception {
+        helpTestWriteFailure(true);
+	}
+	
+	@Test
+    public void testDataTableWriteFailure() throws Exception {
+        helpTestWriteFailure(false);
+	}
+
+	private void helpTestWriteFailure(boolean indexTableWriteFailure) throws SQLException {
+		ResultSet rs;
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = driver.connect(url, props);
+        conn.setAutoCommit(false);
+        conn.createStatement().execute(
+                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR PRIMARY KEY, v1 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+        conn.createStatement().execute(
+                "CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1)");
+        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?)");
+        // to create a data table write failure set k as the ROW_TO_FAIL, to create an index table write failure set v1 as the ROW_TO_FAIL, 
+        // FailingRegionObserver will throw an exception if the put contains ROW_TO_FAIL
+        stmt.setString(1, !indexTableWriteFailure ? ROW_TO_FAIL : "k1");
+        stmt.setString(2, indexTableWriteFailure ? ROW_TO_FAIL : "k2");
+        stmt.execute();
+        stmt.setString(1, "k2");
+        stmt.setString(2, "v2");
+        stmt.execute();
+        try {
+        	conn.commit();
+        	fail();
+        }
+        catch (Exception e) {
+        	conn.rollback();
+        }
+        stmt.setString(1, "k3");
+        stmt.setString(2, "v3");
+        stmt.execute();
+        //this should pass
+        conn.commit();
+        
+        // verify that only k3,v3 exists in the data table
+        String dataSql = "SELECT k, v1 FROM " + DATA_TABLE_FULL_NAME + " order by k";
+        rs = conn.createStatement().executeQuery("EXPLAIN "+dataSql);
+        assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER S.T",
+                QueryUtil.getExplainPlan(rs));
+        rs = conn.createStatement().executeQuery(dataSql);
+        assertTrue(rs.next());
+        assertEquals("k3", rs.getString(1));
+        assertEquals("v3", rs.getString(2));
+        assertFalse(rs.next());
+
+        // verify the only k3,v3  exists in the index table
+        String indexSql = "SELECT k, v1 FROM " + DATA_TABLE_FULL_NAME + " order by v1";
+        rs = conn.createStatement().executeQuery("EXPLAIN "+indexSql);
+        if(localIndex) {
+            assertEquals(
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + DATA_TABLE_FULL_NAME + " [-32768]\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                "CLIENT MERGE SORT",
+                QueryUtil.getExplainPlan(rs));
+        } else {
+	        assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + INDEX_TABLE_FULL_NAME + "\n    SERVER FILTER BY FIRST KEY ONLY",
+	                QueryUtil.getExplainPlan(rs));
+        }
+        rs = conn.createStatement().executeQuery(indexSql);
+        assertTrue(rs.next());
+        assertEquals("k3", rs.getString(1));
+        assertEquals("v3", rs.getString(2));
+        assertFalse(rs.next());
+        
+        conn.createStatement().execute("DROP TABLE " + DATA_TABLE_FULL_NAME);
+	}
+	
+	
+	public static class FailingRegionObserver extends SimpleRegionObserver {
+        @Override
+        public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
+                final Durability durability) throws HBaseIOException {
+            if (shouldFailUpsert(c, put)) {
+                // throwing anything other than instances of IOException result
+                // in this coprocessor being unloaded
+                // DoNotRetryIOException tells HBase not to retry this mutation
+                // multiple times
+                throw new DoNotRetryIOException();
+            }
+        }
+        
+        private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+            return Bytes.contains(put.getRow(), Bytes.toBytes(ROW_TO_FAIL));
+        }
+        
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 38e8ae7..52b5b5f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -10,6 +10,7 @@
 package org.apache.phoenix.tx;
 
 import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -21,15 +22,18 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Before;
@@ -46,13 +50,6 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
     public void setUp() throws SQLException {
         ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
     }
-
-	@BeforeClass
-    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
 		
 	@Test
 	public void testReadOwnWrites() throws Exception {
@@ -248,4 +245,5 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
 		conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true");
 		testRowConflicts();
 	}
+	
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index 9fa69de..63d4851 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -26,21 +26,59 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
 
 import co.cask.tephra.Transaction.VisibilityLevel;
 
+@RunWith(Parameterized.class)
 public class TxCheckpointIT extends BaseHBaseManagedTimeIT {
+	
+	private final boolean localIndex;
+	private final boolean mutable;
+
+	public TxCheckpointIT(boolean localIndex, boolean mutable) {
+		this.localIndex = localIndex;
+		this.mutable = mutable;
+	}
+	
+	@BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        // We need this b/c we don't allow a transactional table to be created if the underlying
+        // HBase table already exists (since we don't know if it was transactional before).
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+	
+	@Parameters(name="localIndex = {0} , mutable = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false }, { false, true }, { true, false }, { true, true }  
+           });
+    }
 
-    
     @Test
     public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -50,7 +88,8 @@ public class TxCheckpointIT extends BaseHBaseManagedTimeIT {
         Connection conn = DriverManager.getConnection(getUrl(), props);
         conn.setAutoCommit(true);
         conn.createStatement().execute("CREATE SEQUENCE keys");
-        conn.createStatement().execute("CREATE TABLE txfoo (pk INTEGER PRIMARY KEY, val INTEGER) TRANSACTIONAL=true");
+        conn.createStatement().execute("CREATE TABLE txfoo (pk INTEGER PRIMARY KEY, val INTEGER)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+        conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX idx ON txfoo (val)");
 
         conn.createStatement().execute("UPSERT INTO txfoo VALUES (NEXT VALUE FOR keys,1)");
         for (int i=0; i<6; i++) {
@@ -62,150 +101,239 @@ public class TxCheckpointIT extends BaseHBaseManagedTimeIT {
     }
     
     @Test
-    public void testCheckpointForUpsertSelect() throws Exception {
-        ResultSet rs;
+    public void testRollbackOfUncommittedDelete() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("create table tx1 (id bigint not null primary key) TRANSACTIONAL=true");
-        conn.createStatement().execute("create table tx2 (id bigint not null primary key) TRANSACTIONAL=true");
-
-        conn.createStatement().execute("upsert into tx1 values (1)");
-        conn.createStatement().execute("upsert into tx1 values (2)");
-        conn.createStatement().execute("upsert into tx1 values (3)");
-        conn.commit();
-
-        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
-        state.startTransaction();
-        long wp = state.getWritePointer();
-        conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1");
-        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
-        assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(4,rs.getLong(1));
-        assertFalse(rs.next());
-        
-        conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1");
-        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
-        assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moves
-        wp = state.getWritePointer();
-        
-        conn.createStatement().execute("upsert into tx1 select id from tx2");
-        assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
-        // Write ptr shouldn't move b/c we're not reading from a table with uncommitted data
-        assertEquals(wp, state.getWritePointer()); 
-        
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(5,rs.getLong(1));
-        assertFalse(rs.next());
-        
-        conn.rollback();
-        
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(3,rs.getLong(1));
-        assertFalse(rs.next());
-
-        wp = state.getWritePointer();
-        conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1");
-        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
-        assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(4,rs.getLong(1));
-        assertFalse(rs.next());
-        
-        conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1");
-        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
-        assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moves
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(5,rs.getLong(1));
-        assertFalse(rs.next());
-        
-        conn.commit();
-        
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(5,rs.getLong(1));
-        assertFalse(rs.next());
-    }  
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX DEMO_idx ON DEMO (v1) INCLUDE(v2)");
+            
+            stmt.executeUpdate("upsert into DEMO values('x1', 'y1', 'a1')");
+            stmt.executeUpdate("upsert into DEMO values('x2', 'y2', 'a2')");
+            
+            //assert values in data table
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.commit();
+            
+            stmt.executeUpdate("DELETE FROM DEMO WHERE k='x1' AND v1='y1' AND v2='a1'");
+            //assert row is delete in data table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert row is delete in index table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert two rows in data table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert two rows in index table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+	@Test
+	public void testCheckpointForUpsertSelect() throws Exception {
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
+			conn.setAutoCommit(false);
+			Statement stmt = conn.createStatement();
 
-    @Test
+			stmt.execute("CREATE TABLE DEMO(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"
+					+ (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+			stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
+					+ "INDEX IDX ON DEMO (v1) INCLUDE(v2)");
+
+            stmt.executeUpdate("upsert into DEMO values(1, 'a2', 'b1')");
+            stmt.executeUpdate("upsert into DEMO values(2, 'a2', 'b2')");
+            stmt.executeUpdate("upsert into DEMO values(3, 'a3', 'b3')");
+			conn.commit();
+
+			upsertRows(conn);
+			conn.rollback();
+			verifyRows(conn, 3);
+
+			upsertRows(conn);
+			conn.commit();
+			verifyRows(conn, 6);
+		}
+	}
+
+	private void verifyRows(Connection conn, int expectedMaxId) throws SQLException {
+		ResultSet rs;
+		//query the data table
+		rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ max(id) from DEMO");
+		assertTrue(rs.next());
+		assertEquals(expectedMaxId, rs.getLong(1));
+		assertFalse(rs.next());
+		
+		// query the index
+		rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ max(id) from DEMO");
+		assertTrue(rs.next());
+		assertEquals(expectedMaxId, rs.getLong(1));
+		assertFalse(rs.next());
+	}
+
+	private void upsertRows(Connection conn) throws SQLException {
+		ResultSet rs;
+		MutationState state = conn.unwrap(PhoenixConnection.class)
+				.getMutationState();
+		state.startTransaction();
+		long wp = state.getWritePointer();
+		conn.createStatement().execute(
+				"upsert into DEMO select max(id)+1, 'a4', 'b4' from DEMO");
+		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+				state.getVisibilityLevel());
+		assertEquals(wp, state.getWritePointer()); // Make sure write ptr
+													// didn't move
+		rs = conn.createStatement().executeQuery("select max(id) from DEMO");
+
+		assertTrue(rs.next());
+		assertEquals(4, rs.getLong(1));
+		assertFalse(rs.next());
+
+		conn.createStatement().execute(
+				"upsert into DEMO select max(id)+1, 'a5', 'b5' from DEMO");
+		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+				state.getVisibilityLevel());
+		assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
+														// moves
+		wp = state.getWritePointer();
+		rs = conn.createStatement().executeQuery("select max(id) from DEMO");
+
+		assertTrue(rs.next());
+		assertEquals(5, rs.getLong(1));
+		assertFalse(rs.next());
+		
+		conn.createStatement().execute(
+				"upsert into DEMO select max(id)+1, 'a6', 'b6' from DEMO");
+		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+				state.getVisibilityLevel());
+		assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
+														// moves
+		wp = state.getWritePointer();
+		rs = conn.createStatement().executeQuery("select max(id) from DEMO");
+
+		assertTrue(rs.next());
+		assertEquals(6, rs.getLong(1));
+		assertFalse(rs.next());
+	}
+	
+	@Test
     public void testCheckpointForDelete() throws Exception {
-        ResultSet rs;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("create table tx3 (id1 bigint primary key, fk1 integer) TRANSACTIONAL=true");
-        conn.createStatement().execute("create table tx4 (id2 bigint primary key, fk2 integer) TRANSACTIONAL=true");
-
-        conn.createStatement().execute("upsert into tx3 values (1, 3)");
-        conn.createStatement().execute("upsert into tx3 values (2, 2)");
-        conn.createStatement().execute("upsert into tx3 values (3, 1)");
-        conn.createStatement().execute("upsert into tx4 values (1, 1)");
-        conn.commit();
-
-        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
-        state.startTransaction();
-        long wp = state.getWritePointer();
-        conn.createStatement().execute("delete from tx3 where id1=fk1");
-        assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
-        assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
-
-        rs = conn.createStatement().executeQuery("select id1 from tx3");
-        assertTrue(rs.next());
-        assertEquals(1,rs.getLong(1));
-        assertTrue(rs.next());
-        assertEquals(3,rs.getLong(1));
-        assertFalse(rs.next());
-
-        conn.createStatement().execute("delete from tx3 where id1 in (select fk1 from tx3 join tx4 on (fk2=id1))");
-        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
-        assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
-
-        rs = conn.createStatement().executeQuery("select id1 from tx3");
-        assertTrue(rs.next());
-        assertEquals(1,rs.getLong(1));
-        assertFalse(rs.next());
-
-        /*
-         * TODO: file Tephra JIRA, as this fails with an NPE because the
-         * ActionChange has a null family since we're issuing row deletes.
-         * See this code in TransactionAwareHTable.transactionalizeAction(Delete)
-         * and try modifying addToChangeSet(deleteRow, null, null);
-         * to modifying addToChangeSet(deleteRow, family, null);
-            } else {
-              for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) {
-                byte[] family = familyEntry.getKey();
-                List<Cell> entries = familyEntry.getValue();
-                boolean isFamilyDelete = false;
-                if (entries.size() == 1) {
-                  Cell cell = entries.get(0);
-                  isFamilyDelete = CellUtil.isDeleteFamily(cell);
-                }
-                if (isFamilyDelete) {
-                  if (conflictLevel == TxConstants.ConflictDetection.ROW ||
-                      conflictLevel == TxConstants.ConflictDetection.NONE) {
-                    // no need to identify individual columns deleted
-                    txDelete.deleteFamily(family);
-                    addToChangeSet(deleteRow, null, null);
-         */
-//        conn.rollback();
-//        rs = conn.createStatement().executeQuery("select id1 from tx3");
-//        assertTrue(rs.next());
-//        assertEquals(1,rs.getLong(1));
-//        assertTrue(rs.next());
-//        assertEquals(2,rs.getLong(1));
-//        assertTrue(rs.next());
-//        assertEquals(3,rs.getLong(1));
-//        assertFalse(rs.next());
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		ResultSet rs;
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
+			conn.setAutoCommit(false);
+			Statement stmt = conn.createStatement();
+			stmt.execute("CREATE TABLE DEMO1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)"
+					+ (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+			stmt.execute("CREATE TABLE DEMO2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)"
+					+ (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+			stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
+					+ "INDEX IDX ON DEMO1 (FK1B)");
+			
+			stmt.executeUpdate("upsert into DEMO1 values (1, 3, 3)");
+			stmt.executeUpdate("upsert into DEMO1 values (2, 2, 2)");
+			stmt.executeUpdate("upsert into DEMO1 values (3, 1, 1)");
+			stmt.executeUpdate("upsert into DEMO2 values (1, 1)");
+			conn.commit();
 
+	        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
+	        state.startTransaction();
+	        long wp = state.getWritePointer();
+	        conn.createStatement().execute("delete from DEMO1 where id1=fk1b AND fk1b=id1");
+	        assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
+	        assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
+	
+	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from DEMO1");
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getLong(1));
+	        assertFalse(rs.next());
+	        
+	        rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from DEMO1");
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertFalse(rs.next());
+	
+	        conn.createStatement().execute("delete from DEMO1 where id1 in (select fk1a from DEMO1 join DEMO2 on (fk2=id1))");
+	        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
+	        assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
+	
+	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from DEMO1");
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertFalse(rs.next());
+	
+	        conn.rollback();
+	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from DEMO1");
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(2,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getLong(1));
+	        assertFalse(rs.next());
+		}
     }  
+
+    
 }