You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/02 04:59:40 UTC

[06/50] [abbrv] phoenix git commit: PHOENIX-2478 Rows committed in transaction overlapping index creation are not populated

PHOENIX-2478 Rows committed in transaction overlapping index creation are not populated


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3520e128
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3520e128
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3520e128

Branch: refs/heads/calcite
Commit: 3520e12858223dfcad343e0a3a29c71fe4d074bc
Parents: 1369937
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Jan 19 08:19:44 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Jan 19 08:19:44 2016 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/index/ImmutableIndexIT.java | 239 ++++++++++++++-----
 .../apache/phoenix/execute/MutationState.java   | 115 +++++----
 .../java/org/apache/phoenix/query/BaseTest.java | 103 +++-----
 3 files changed, 285 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3520e128/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 0d329fe..c4ecfbb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.end2end.index;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -29,12 +28,26 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
-import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -47,80 +60,194 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 
 @RunWith(Parameterized.class)
 public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
-	
-	private final boolean localIndex;
-	private final String tableDDLOptions;
-	private final String tableName;
+
+    private final boolean localIndex;
+    private final String tableDDLOptions;
+    private final String tableName;
     private final String indexName;
     private final String fullTableName;
     private final String fullIndexName;
-	
-	public ImmutableIndexIT(boolean localIndex, boolean transactional) {
-		this.localIndex = localIndex;
-		StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
-		if (transactional) {
-			optionBuilder.append(", TRANSACTIONAL=true");
-		}
-		this.tableDDLOptions = optionBuilder.toString();
-		this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( transactional ?  "_TXN" : "");
+
+    private static String TABLE_NAME;
+    private static String INDEX_DDL;
+    public static final AtomicInteger NUM_ROWS = new AtomicInteger(1);
+
+    public ImmutableIndexIT(boolean localIndex, boolean transactional) {
+        this.localIndex = localIndex;
+        StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
+        if (transactional) {
+            optionBuilder.append(", TRANSACTIONAL=true");
+        }
+        this.tableDDLOptions = optionBuilder.toString();
+        this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( transactional ?  "_TXN" : "");
         this.indexName = "IDX" + ( transactional ?  "_TXN" : "");
         this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
         this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-	}
-	
-	@BeforeClass
+    }
+
+    @BeforeClass
     @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
     public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
-        props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+        serverProps.put("hbase.coprocessor.region.classes", CreateIndexRegionObserver.class.getName());
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
-	
-	@Parameters(name="localIndex = {0} , transactional = {1}")
+
+    @Parameters(name="localIndex = {0} , transactional = {1}")
     public static Collection<Boolean[]> data() {
         return Arrays.asList(new Boolean[][] {     
-                 { false, false }, { false, true }, { true, false }, { true, true }
-           });
+            { false, true }, { true, true }
+        });
     }
-   
+
+
     @Test
-    public void testDropIfImmutableKeyValueColumn() throws Exception {
-    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    public void testCreateIndexDuringUpsertSelect() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(100));
+        TABLE_NAME = fullTableName + "_testCreateIndexDuringUpsertSelect";
+        String ddl ="CREATE TABLE " + TABLE_NAME + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+        INDEX_DDL = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IF NOT EXISTS " + indexName + " ON " + TABLE_NAME
+                + " (long_pk, varchar_pk)"
+                + " INCLUDE (long_col1, long_col2)";
+
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            conn.setAutoCommit(false);
+            Statement stmt = conn.createStatement();
+            stmt.execute(ddl);
+
+            upsertRows(conn, TABLE_NAME, 220);
+            conn.commit();
+
+            // run the upsert select and also create an index
+            conn.setAutoCommit(true);
+            String upsertSelect = "UPSERT INTO " + TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) " + 
+                    "SELECT varchar_pk||'_upsert_select', char_pk, int_pk, long_pk, decimal_pk, date_pk FROM "+ TABLE_NAME;    
+            conn.createStatement().execute(upsertSelect);
+
+            ResultSet rs;
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(440,rs.getInt(1));
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(440,rs.getInt(1));
+        }
+        finally {
+            conn.close();
+        }
+    }
+
+    // used to create an index while a batch of rows are being written
+    public static class CreateIndexRegionObserver extends SimpleRegionObserver {
+        @Override
+        public void postPut(ObserverContext<RegionCoprocessorEnvironment> c,
+                Put put, WALEdit edit, final Durability durability)
+                        throws HBaseIOException {
+            String tableName = c.getEnvironment().getRegion().getRegionInfo()
+                    .getTable().getNameAsString();
+            if (tableName.equalsIgnoreCase(TABLE_NAME)
+                    // create the index after the second batch of 1000 rows
+                    && Bytes.startsWith(put.getRow(), Bytes.toBytes("varchar200_upsert_select"))) {
+                try {
+                    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+                    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                        conn.createStatement().execute(INDEX_DDL);
+                    }
+                } catch (SQLException e) {
+                    throw new DoNotRetryIOException(e);
+                } 
+            }
+        }
+    }
+
+    private static class UpsertRunnable implements Runnable {
+        private static final int NUM_ROWS_IN_BATCH = 10000;
+        private final String fullTableName;
+
+        public UpsertRunnable(String fullTableName) {
+            this.fullTableName = fullTableName;
+        }
+
+        public void run() {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                while (true) {
+                    // write a large batch of rows
+                    boolean fistRowInBatch = true;
+                    for (int i=0; i<NUM_ROWS_IN_BATCH; ++i) {
+                        BaseTest.upsertRow(conn, fullTableName, NUM_ROWS.intValue(), fistRowInBatch);
+                        NUM_ROWS.incrementAndGet();
+                        fistRowInBatch = false;
+                    }
+                    conn.commit();
+                    Thread.sleep(500);
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Test
+    public void testCreateIndexWhileUpsertingData() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+        String indexDDL = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IF NOT EXISTS " + indexName + " ON " + fullTableName
+                + " (long_pk, varchar_pk)"
+                + " INCLUDE (long_col1, long_col2)";
+        int numThreads = 3;
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-	        conn.setAutoCommit(false);
-	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
-	        Statement stmt = conn.createStatement();
-	        stmt.execute(ddl);
-	        populateTestTable(fullTableName);
-	        ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + " (long_col1)";
-	        stmt.execute(ddl);
-	        
-	        ResultSet rs;
-	        
-	        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName);
-	        assertTrue(rs.next());
-	        assertEquals(3,rs.getInt(1));
-	        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
-	        assertTrue(rs.next());
-	        assertEquals(3,rs.getInt(1));
-	        
-	        conn.setAutoCommit(true);
-	        String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
-	        try {
-	            conn.createStatement().execute(dml);
-	            fail();
-	        } catch (SQLException e) {
-	            assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode());
-	        }
-	            
-	        conn.createStatement().execute("DROP TABLE " + fullTableName);
+            conn.setAutoCommit(false);
+            Statement stmt = conn.createStatement();
+            stmt.execute(ddl);
+
+            ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+            List<Future<?>> futureList = Lists.newArrayListWithExpectedSize(numThreads);
+            for (int i =0; i<numThreads; ++i) {
+                futureList.add(threadPool.submit(new UpsertRunnable(fullTableName)));
+            }
+            // upsert some rows before creating the index 
+            Thread.sleep(5000);
+
+            // create the index 
+            try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) {
+                conn2.setAutoCommit(false);
+                Statement stmt2 = conn2.createStatement();
+                stmt2.execute(indexDDL);
+                conn2.commit();
+            }
+
+            // upsert some rows after creating the index
+            Thread.sleep(1000);
+            // cancel the running threads
+            for (Future<?> future : futureList) {
+                future.cancel(true);
+            }
+            threadPool.shutdownNow();
+            threadPool.awaitTermination(30, TimeUnit.SECONDS);
+            Thread.sleep(1000);
+
+            ResultSet rs;
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + fullTableName);
+            assertTrue(rs.next());
+            int dataTableRowCount = rs.getInt(1);
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+            assertTrue(rs.next());
+            int indexTableRowCount = rs.getInt(1);
+            assertEquals("Data and Index table should have the same number of rows ", dataTableRowCount, indexTableRowCount);
         }
     }
-    
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3520e128/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index a6fe98d..8caac5d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -135,7 +135,7 @@ public class MutationState implements SQLCloseable {
     private int numRows = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
-    private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations;
+    private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
     
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
@@ -435,6 +435,59 @@ public class MutationState implements SQLCloseable {
         return sizeOffset + numRows;
     }
     
+    private void joinMutationState(TableRef tableRef, Map<ImmutableBytesPtr,RowMutationState> srcRows,
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
+        PTable table = tableRef.getTable();
+        boolean isIndex = table.getType() == PTableType.INDEX;
+        boolean incrementRowCount = dstMutations == this.mutations;
+        Map<ImmutableBytesPtr,RowMutationState> existingRows = dstMutations.put(tableRef, srcRows);
+        if (existingRows != null) { // Rows for that table already exist
+            // Loop through new rows and replace existing with new
+            for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) {
+                // Replace existing row with new row
+                RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
+                if (existingRowMutationState != null) {
+                    Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
+                    if (existingValues != PRow.DELETE_MARKER) {
+                        Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
+                        // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. 
+                        if (newRow != PRow.DELETE_MARKER) {
+                            // Merge existing column values with new column values
+                            existingRowMutationState.join(rowEntry.getValue());
+                            // Now that the existing row has been merged with the new row, replace it back
+                            // again (since it was merged with the new one above).
+                            existingRows.put(rowEntry.getKey(), existingRowMutationState);
+                        }
+                    }
+                } else {
+                    if (incrementRowCount && !isIndex) { // Don't count index rows in row count
+                        numRows++;
+                    }
+                }
+            }
+            // Put the existing one back now that it's merged
+            dstMutations.put(tableRef, existingRows);
+        } else {
+            // Size new map at batch size as that's what it'll likely grow to.
+            Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
+            newRows.putAll(srcRows);
+            dstMutations.put(tableRef, newRows);
+            if (incrementRowCount && !isIndex) {
+                numRows += srcRows.size();
+            }
+        }
+    }
+    
+    private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> srcMutations, 
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
+        // Merge newMutation with this one, keeping state from newMutation for any overlaps
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : srcMutations.entrySet()) {
+            // Replace existing entries for the table with new entries
+            TableRef tableRef = entry.getKey();
+            Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue();
+            joinMutationState(tableRef, srcRows, dstMutations);
+        }
+    }
     /**
      * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence.
      * Combine any metrics collected for the newer mutation.
@@ -453,48 +506,12 @@ public class MutationState implements SQLCloseable {
             txAwares.addAll(newMutationState.txAwares);
         }
         this.sizeOffset += newMutationState.sizeOffset;
-        // Merge newMutation with this one, keeping state from newMutation for any overlaps
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutationState.mutations.entrySet()) {
-            // Replace existing entries for the table with new entries
-            TableRef tableRef = entry.getKey();
-            PTable table = tableRef.getTable();
-            boolean isIndex = table.getType() == PTableType.INDEX;
-            Map<ImmutableBytesPtr,RowMutationState> existingRows = this.mutations.put(tableRef, entry.getValue());
-            if (existingRows != null) { // Rows for that table already exist
-                // Loop through new rows and replace existing with new
-                for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
-                    // Replace existing row with new row
-                    RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
-                    if (existingRowMutationState != null) {
-                        Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
-                        if (existingValues != PRow.DELETE_MARKER) {
-                            Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
-                            // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. 
-                            if (newRow != PRow.DELETE_MARKER) {
-                                // Merge existing column values with new column values
-                                existingRowMutationState.join(rowEntry.getValue());
-                                // Now that the existing row has been merged with the new row, replace it back
-                                // again (since it was merged with the new one above).
-                                existingRows.put(rowEntry.getKey(), existingRowMutationState);
-                            }
-                        }
-                    } else {
-                        if (!isIndex) { // Don't count index rows in row count
-                            numRows++;
-                        }
-                    }
-                }
-                // Put the existing one back now that it's merged
-                this.mutations.put(entry.getKey(), existingRows);
-            } else {
-                // Size new map at batch size as that's what it'll likely grow to.
-                Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
-                newRows.putAll(entry.getValue());
-                this.mutations.put(tableRef, newRows);
-                if (!isIndex) {
-                    numRows += entry.getValue().size();
-                }
+        joinMutationState(newMutationState.mutations, this.mutations);
+        if (!newMutationState.txMutations.isEmpty()) {
+            if (txMutations.isEmpty()) {
+                txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
             }
+            joinMutationState(newMutationState.txMutations, this.txMutations);
         }
         mutationMetricQueue.combineMetricQueues(newMutationState.mutationMetricQueue);
         if (readMetricQueue == null) {
@@ -915,6 +932,7 @@ public class MutationState implements SQLCloseable {
                             long startTime = System.currentTimeMillis();
                             child.addTimelineAnnotation("Attempt " + retryCount);
                             hTable.batch(mutationList);
+                            if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName));
                             child.stop();
                             child.stop();
                             shouldRetry = false;
@@ -980,13 +998,13 @@ public class MutationState implements SQLCloseable {
                 // committed in the event of a failure.
                 if (isTransactional) {
                     addUncommittedStatementIndexes(valuesMap.values());
-                    if (txMutations == null) {
+                    if (txMutations.isEmpty()) {
                         txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
                     }
                     // Keep all mutations we've encountered until a commit or rollback.
                     // This is not ideal, but there's not good way to get the values back
                     // in the event that we need to replay the commit.
-                    txMutations.put(tableRef, valuesMap);
+                    joinMutationState(tableRef, valuesMap, txMutations);
                 }
                 // Remove batches as we process them
                 if (sendAll) {
@@ -1082,7 +1100,7 @@ public class MutationState implements SQLCloseable {
     private void resetTransactionalState() {
         tx = null;
         txAwares.clear();
-        txMutations = null;
+        txMutations = Collections.emptyMap();
         uncommittedPhysicalNames.clear();
         uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     }
@@ -1187,9 +1205,7 @@ public class MutationState implements SQLCloseable {
                 break;
             }
             retryCount++;
-            if (txMutations != null) {
-                mutations.putAll(txMutations);
-            }
+            mutations.putAll(txMutations);
         } while (true);
     }
 
@@ -1214,6 +1230,7 @@ public class MutationState implements SQLCloseable {
             if (result.getTable() == null) {
                 throw new TableNotFoundException(dataTable.getSchemaName().getString(), dataTable.getTableName().getString());
             }
+            tableRef.setTable(result.getTable());
             if (!result.wasUpdated()) {
                 if (logger.isInfoEnabled()) logger.info("No updates to " + dataTable.getName().getString() + " as of "  + timestamp);
                 continue;
@@ -1223,7 +1240,7 @@ public class MutationState implements SQLCloseable {
                 // that an index was dropped and recreated with the same name but different
                 // indexed/covered columns.
                 addedIndexes = (!oldIndexes.equals(result.getTable().getIndexes()));
-                if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "as of "  + timestamp + " to " + dataTable.getName().getString() + " with indexes " + dataTable.getIndexes());
+                if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "as of "  + timestamp + " to " + dataTable.getName().getString() + " with indexes " + tableRef.getTable().getIndexes());
             }
         }
         if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "to indexes as of "  + getInitialWritePointer());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3520e128/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 35bb8ce..951bfce 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.query;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -150,6 +151,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer;
 import org.apache.phoenix.hbase.index.master.IndexMasterObserver;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
@@ -200,7 +202,7 @@ import com.google.inject.util.Providers;
 public abstract class BaseTest {
     protected static final String TEST_TABLE_SCHEMA = "(" +
             "   varchar_pk VARCHAR NOT NULL, " +
-            "   char_pk CHAR(6) NOT NULL, " +
+            "   char_pk CHAR(10) NOT NULL, " +
             "   int_pk INTEGER NOT NULL, "+ 
             "   long_pk BIGINT NOT NULL, " +
             "   decimal_pk DECIMAL(31, 10) NOT NULL, " +
@@ -1805,77 +1807,44 @@ public abstract class BaseTest {
     public HBaseTestingUtility getUtility() {
         return utility;
     }
+    
+    public static void upsertRows(Connection conn, String fullTableName, int numRows) throws SQLException {
+    	for (int i=1; i<=numRows; ++i) {
+	        upsertRow(conn, fullTableName, i, false);
+    	}
+    }
+
+    public static void upsertRow(Connection conn, String fullTableName, int index, boolean firstRowInBatch) throws SQLException {
+    	String upsert = "UPSERT INTO " + fullTableName
+                + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+		PreparedStatement stmt = conn.prepareStatement(upsert);
+		stmt.setString(1, firstRowInBatch ? "firstRowInBatch_" : "" + "varchar"+index);
+		stmt.setString(2, "char"+index);
+		stmt.setInt(3, index);
+		stmt.setLong(4, index);
+		stmt.setBigDecimal(5, new BigDecimal(index));
+		Date date = DateUtil.parseDate("2015-01-01 00:00:00");
+		stmt.setDate(6, date);
+		stmt.setString(7, "varchar_a");
+		stmt.setString(8, "chara");
+		stmt.setInt(9, index+1);
+		stmt.setLong(10, index+1);
+		stmt.setBigDecimal(11, new BigDecimal(index+1));
+		stmt.setDate(12, date);
+		stmt.setString(13, "varchar_b");
+		stmt.setString(14, "charb");
+		stmt.setInt(15, index+2);
+		stmt.setLong(16, index+2);
+		stmt.setBigDecimal(17, new BigDecimal(index+2));
+		stmt.setDate(18, date);
+		stmt.executeUpdate();
+	}
 
     // Populate the test table with data.
     public static void populateTestTable(String fullTableName) throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            String upsert = "UPSERT INTO " + fullTableName
-                    + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-            PreparedStatement stmt = conn.prepareStatement(upsert);
-            stmt.setString(1, "varchar1");
-            stmt.setString(2, "char1");
-            stmt.setInt(3, 1);
-            stmt.setLong(4, 1L);
-            stmt.setBigDecimal(5, new BigDecimal(1.0));
-            Date date = DateUtil.parseDate("2015-01-01 00:00:00");
-            stmt.setDate(6, date);
-            stmt.setString(7, "varchar_a");
-            stmt.setString(8, "chara");
-            stmt.setInt(9, 2);
-            stmt.setLong(10, 2L);
-            stmt.setBigDecimal(11, new BigDecimal(2.0));
-            stmt.setDate(12, date);
-            stmt.setString(13, "varchar_b");
-            stmt.setString(14, "charb");
-            stmt.setInt(15, 3);
-            stmt.setLong(16, 3L);
-            stmt.setBigDecimal(17, new BigDecimal(3.0));
-            stmt.setDate(18, date);
-            stmt.executeUpdate();
-            
-            stmt.setString(1, "varchar2");
-            stmt.setString(2, "char2");
-            stmt.setInt(3, 2);
-            stmt.setLong(4, 2L);
-            stmt.setBigDecimal(5, new BigDecimal(2.0));
-            date = DateUtil.parseDate("2015-01-02 00:00:00");
-            stmt.setDate(6, date);
-            stmt.setString(7, "varchar_a");
-            stmt.setString(8, "chara");
-            stmt.setInt(9, 3);
-            stmt.setLong(10, 3L);
-            stmt.setBigDecimal(11, new BigDecimal(3.0));
-            stmt.setDate(12, date);
-            stmt.setString(13, "varchar_b");
-            stmt.setString(14, "charb");
-            stmt.setInt(15, 4);
-            stmt.setLong(16, 4L);
-            stmt.setBigDecimal(17, new BigDecimal(4.0));
-            stmt.setDate(18, date);
-            stmt.executeUpdate();
-            
-            stmt.setString(1, "varchar3");
-            stmt.setString(2, "char3");
-            stmt.setInt(3, 3);
-            stmt.setLong(4, 3L);
-            stmt.setBigDecimal(5, new BigDecimal(3.0));
-            date = DateUtil.parseDate("2015-01-03 00:00:00");
-            stmt.setDate(6, date);
-            stmt.setString(7, "varchar_a");
-            stmt.setString(8, "chara");
-            stmt.setInt(9, 4);
-            stmt.setLong(10, 4L);
-            stmt.setBigDecimal(11, new BigDecimal(4.0));
-            stmt.setDate(12, date);
-            stmt.setString(13, "varchar_b");
-            stmt.setString(14, "charb");
-            stmt.setInt(15, 5);
-            stmt.setLong(16, 5L);
-            stmt.setBigDecimal(17, new BigDecimal(5.0));
-            stmt.setDate(18, date);
-            stmt.executeUpdate();
-            
+        	upsertRows(conn, fullTableName, 3);
             conn.commit();
         }
     }