You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by je...@apache.org on 2014/08/08 23:08:48 UTC

git commit: PHOENIX-1147: Ensure data table is sent to client if index table changes states

Repository: phoenix
Updated Branches:
  refs/heads/3.0 fa72e44a6 -> 0272e2841


PHOENIX-1147: Ensure data table is sent to client if index table changes states


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0272e284
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0272e284
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0272e284

Branch: refs/heads/3.0
Commit: 0272e2841b08d40c3bfda6bbdf401ff0f9c40c90
Parents: fa72e44
Author: Jeffrey Zhong <je...@apache.org>
Authored: Tue Aug 5 14:55:57 2014 -0700
Committer: Jeffrey Zhong <je...@apache.org>
Committed: Fri Aug 8 14:08:00 2014 -0700

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    | 201 ++++++++++++++++++-
 .../coprocessor/MetaDataEndpointImpl.java       |  23 ++-
 .../coprocessor/MetaDataRegionObserver.java     |  22 +-
 .../apache/phoenix/execute/MutationState.java   |   4 +-
 .../index/PhoenixIndexFailurePolicy.java        |   8 +-
 .../apache/phoenix/schema/MetaDataClient.java   |   6 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |   6 +-
 .../org/apache/phoenix/schema/TableRef.java     |   6 +-
 8 files changed, 248 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 8fa8e20..bbf8a6b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -27,30 +27,65 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseCluster;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -65,32 +100,39 @@ import org.junit.experimental.categories.Category;
  */
 @Category(NeedsOwnMiniClusterTest.class)
 public class MutableIndexFailureIT extends BaseTest {
+    private static final int NUM_SLAVES = 4;
     private static String url;
     private static PhoenixTestDriver driver;
     private static HBaseTestingUtility util;
+    private Timer scheduleTimer;
 
-    private static final String SCHEMA_NAME = "";
+    private static final String SCHEMA_NAME = "S";
     private static final String INDEX_TABLE_NAME = "I";
     private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
     private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I");
 
-    @BeforeClass
-    public static void doSetup() throws Exception {
+    @Before
+    public void doSetup() throws Exception {
         Configuration conf = HBaseConfiguration.create();
         setUpConfigForMiniCluster(conf);
         conf.setInt("hbase.client.retries.number", 2);
         conf.setInt("hbase.client.pause", 5000);
+        conf.setInt("hbase.balancer.period", Integer.MAX_VALUE);
         conf.setLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 0);
         util = new HBaseTestingUtility(conf);
-        util.startMiniCluster();
+        util.startMiniCluster(NUM_SLAVES);
         String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
         url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
                 + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
         driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
     }
 
-    @AfterClass
-    public static void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
+        if(scheduleTimer != null){
+            scheduleTimer.cancel();
+            scheduleTimer = null;
+        }
         try {
             destroyDriver(driver);
         } finally {
@@ -158,6 +200,7 @@ public class MutableIndexFailureIT extends BaseTest {
         assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE"));
         assertFalse(rs.next());
         
+        // Verify UPSERT on data table still work after index is disabled       
         stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
         stmt.setString(1, "a3");
         stmt.setString(2, "x3");
@@ -165,11 +208,18 @@ public class MutableIndexFailureIT extends BaseTest {
         stmt.execute();
         conn.commit();
         
+        query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME + " where v1='x3'";
+        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+        assertTrue(QueryUtil.getExplainPlan(rs).contains("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME));
+        rs = conn.createStatement().executeQuery(query);
+        assertTrue(rs.next());
+        
         // recreate index table
         admin.createTable(indexTableDesc);
         do {
           Thread.sleep(15 * 1000); // sleep 15 secs
-          rs = conn.getMetaData().getTables(null, "", INDEX_TABLE_NAME, new String[] {PTableType.INDEX.toString()});
+          rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+              new String[] { PTableType.INDEX.toString() });
           assertTrue(rs.next());
           if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
               break;
@@ -181,8 +231,143 @@ public class MutableIndexFailureIT extends BaseTest {
         rs = conn.createStatement().executeQuery(query);
         assertTrue(rs.next());
         
-        // using 2 here because we onluy partially build index from where we failed and the oldest 
+        // using 2 here because we only partially build index from where we failed and the oldest 
         // index row has been deleted when we dropped the index table during test.
         assertEquals(2, rs.getInt(1));
     }
+    
+    @Test(timeout=300000)
+    public void testWriteFailureWithRegionServerDown() throws Exception {
+        String query;
+        ResultSet rs;
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = driver.connect(url, props);
+        conn.setAutoCommit(false);
+        conn.createStatement().execute(
+                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
+        rs = conn.createStatement().executeQuery(query);
+        assertFalse(rs.next());
+
+        conn.createStatement().execute(
+                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+        rs = conn.createStatement().executeQuery(query);
+        assertFalse(rs.next());
+
+        // Verify the metadata for index is correct.
+        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+                new String[] { PTableType.INDEX.toString() });
+        assertTrue(rs.next());
+        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
+        assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+        assertFalse(rs.next());
+        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+        stmt.setString(1, "a");
+        stmt.setString(2, "x");
+        stmt.setString(3, "1");
+        stmt.execute();
+        conn.commit();
+        
+        // find a RS which doesn't has CATALOG table
+        byte[] catalogTable = Bytes.toBytes("SYSTEM.CATALOG");
+        byte[] indexTable = Bytes.toBytes(INDEX_TABLE_FULL_NAME);
+        final HBaseCluster cluster = this.util.getHBaseCluster();
+        Collection<ServerName> rss = cluster.getClusterStatus().getServers();
+        HBaseAdmin admin = this.util.getHBaseAdmin();
+        List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
+        ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getRegionName());
+        ServerName metaRS = cluster.getServerHoldingMeta();
+        ServerName rsToBeKilled = null;
+        
+        // find first RS isn't holding META or CATALOG table
+        for(ServerName curRS : rss) {
+            if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
+                rsToBeKilled = curRS;
+                break;
+            }
+        }
+        assertTrue(rsToBeKilled != null);
+        
+        regions = admin.getTableRegions(indexTable);
+        final HRegionInfo indexRegion = regions.get(0);
+        final ServerName dstRS = rsToBeKilled;
+        admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName()));
+        
+        long started = System.currentTimeMillis();
+        while(true) {
+            ServerName sn = cluster.getServerHoldingRegion(indexRegion.getRegionName());
+            if (sn != null && sn.equals(dstRS)) {
+                break;
+            }
+            if((System.currentTimeMillis() - started) > 30000) {
+                assertTrue("Timeout waiting for " + indexRegion + " move to " + rsToBeKilled, false);
+            }
+            Thread.sleep(200);
+        }
+
+        // use timer sending updates in every 10ms
+        this.scheduleTimer = new Timer(true);
+        this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn), 0, 10);
+        // let timer sending some updates
+        Thread.sleep(100);
+        
+        // kill RS hosting index table
+        this.util.getHBaseCluster().killRegionServer(rsToBeKilled);
+        
+        // wait for index table completes recovery
+        this.util.waitUntilAllRegionsAssigned(indexTable);
+        
+        // Verify the metadata for index is correct.       
+        do {
+          Thread.sleep(15 * 1000); // sleep 15 secs
+          rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+              new String[] { PTableType.INDEX.toString() });
+          assertTrue(rs.next());
+          if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+              break;
+          }
+        } while(true);
+        this.scheduleTimer.cancel();
+        
+        assertEquals(cluster.getClusterStatus().getDeadServers(), 1);
+    }
+    
+    static class SendingUpdatesScheduleTask extends TimerTask {
+        private static final Log LOG = LogFactory.getLog(SendingUpdatesScheduleTask.class);
+        
+        // inProgress is to prevent timer from invoking a new task while previous one is still
+        // running
+        private final static AtomicInteger inProgress = new AtomicInteger(0);
+        private final Connection conn;
+        private int inserts = 0;
+
+        public SendingUpdatesScheduleTask(Connection conn) {
+            this.conn = conn;
+        }
+
+        public void run() {
+            if(inProgress.get() > 0){
+                return;
+            }
+            
+            try {
+                inProgress.incrementAndGet();
+                inserts++;
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+                stmt.setString(1, "a" + inserts);
+                stmt.setString(2, "x" + inserts);
+                stmt.setString(3, String.valueOf(inserts));
+                stmt.execute();
+                conn.commit();
+            } catch (Throwable t) {
+                LOG.warn("ScheduledBuildIndexTask failed!", t);
+            } finally {
+                inProgress.decrementAndGet();
+            }
+        }
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 09625ec..aae98b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
@@ -150,6 +151,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
     private static final KeyValue MULTI_TENANT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES);
     private static final KeyValue VIEW_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
     private static final KeyValue VIEW_INDEX_ID_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
+    private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
     private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
             TABLE_TYPE_KV,
             TABLE_SEQ_NUM_KV,
@@ -164,7 +166,8 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
             DISABLE_WAL_KV,
             MULTI_TENANT_KV,
             VIEW_TYPE_KV,
-            VIEW_INDEX_ID_KV
+            VIEW_INDEX_ID_KV,
+            INDEX_DISABLE_TIMESTAMP_KV
             );
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -374,6 +377,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
                 tableKeyValues[j++] = kv;
                 i++;
             } else if (cmp > 0) {
+                timeStamp = Math.max(timeStamp, kv.getTimestamp()); 
                 tableKeyValues[j++] = null;
             } else {
                 i++; // shouldn't happen - means unexpected KV in system table header row
@@ -1138,6 +1142,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
             try {
                 Get get = new Get(key);
                 get.setTimeRange(PTable.INITIAL_SEQ_NUM, timeStamp);
+                get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
                 get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
                 get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
                 Result currentResult = region.get(get);
@@ -1146,6 +1151,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
                 }
                 KeyValue currentStateKV = currentResult.getColumnLatest(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
                 KeyValue currentDisableTimeStamp = currentResult.getColumnLatest(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
+                KeyValue dataTableKV = currentResult.getColumnLatest(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
                
                 PIndexState currentState = PIndexState.fromSerializedValue(currentStateKV.getBuffer()[currentStateKV.getValueOffset()]);
                 
@@ -1192,10 +1198,23 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
                 }
                 
                 if (currentState != newState) {
-                    region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
+                    byte[] dataTableKey = null;
+                    if(dataTableKV != null) {
+                        dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, dataTableKV.getValue());
+                    }
+                    if(dataTableKey != null) {
+                        // insert an empty KV to trigger time stamp update on data table row
+                        Put p = new Put(dataTableKey);
+                        p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
+                        tableMetadata.add(p);
+                    }
+                    region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
                     // Invalidate from cache
                     Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
                     metaDataCache.invalidate(cacheKey);
+                    if(dataTableKey != null) {
+                        metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey));
+                    }
                 }
                 // Get client timeStamp from mutations, since it may get updated by the mutateRowsWithLocks call
                 long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 66b60ce..2820e59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -123,12 +123,14 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
      *
      */
     public static class BuildIndexScheduleTask extends TimerTask {
-      // inProgress is to prevent timer from invoking a new task while previous one is still running
-      private final static AtomicInteger inProgress = new AtomicInteger(0);
-      RegionCoprocessorEnvironment env;
-      public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
-        this.env = env;
-      }
+        // inProgress is to prevent timer from invoking a new task while previous one is still
+        // running
+        private final static AtomicInteger inProgress = new AtomicInteger(0);
+        RegionCoprocessorEnvironment env;
+
+        public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
+            this.env = env;
+        }
       
         private String getJdbcUrl() {
             String zkQuorum = this.env.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM);
@@ -205,6 +207,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 
                     byte[][] rowKeyMetaData = new byte[3][];
                     SchemaUtil.getVarChars(r.getRow(), 3, rowKeyMetaData);
+                    byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
                     byte[] indexTable = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
 
                     // validity check
@@ -216,8 +219,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     if (conn == null) {
                         conn = DriverManager.getConnection(getJdbcUrl()).unwrap(PhoenixConnection.class);
                     }
-                    PTable dataPTable = PhoenixRuntime.getTable(conn, Bytes.toString(dataTable));
-                    PTable indexPTable = PhoenixRuntime.getTable(conn, Bytes.toString(indexTable));
+
+                    String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
+                    String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable);
+                    PTable dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+                    PTable indexPTable = PhoenixRuntime.getTable(conn, indexTableFullName);
                     if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) {
                         LOG.debug("Index rebuild has been skipped because not all regions of index table="
                                 + indexPTable.getName() + " are online.");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 31ab2f2..911e3ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -117,6 +117,7 @@ public class MutationState implements SQLCloseable {
     public long getUpdateCount() {
         return sizeOffset + numRows;
     }
+    
     /**
      * Combine a newer mutation with this one, where in the event of overlaps,
      * the newer one will take precedence.
@@ -284,6 +285,7 @@ public class MutationState implements SQLCloseable {
                     serverTimeStamp = timestamp;
                     if (result.wasUpdated()) {
                         // TODO: use bitset?
+                        table = connection.getMetaDataCache().getTable(new PTableKey(tenantId, table.getName().getString()));
                         PColumn[] columns = new PColumn[table.getColumns().size()];
                         for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
                             Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
@@ -293,12 +295,12 @@ public class MutationState implements SQLCloseable {
                                 }
                             }
                         }
-                        table = connection.getMetaDataCache().getTable(new PTableKey(tenantId, table.getName().getString()));
                         for (PColumn column : columns) {
                             if (column != null) {
                                 table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
                             }
                         }
+                        tableRef.setTable(table);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index cf8335e..149dad2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -72,6 +72,7 @@ public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy {
     @Override
     public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
         Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
+        String indexTableName = "";
         try {
             for (HTableInterfaceReference ref : refs) {
                 long minTimeStamp = 0;
@@ -89,7 +90,7 @@ public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy {
                 }
                 
                 // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
-                String indexTableName = ref.getTableName();
+                indexTableName = ref.getTableName();
                 byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
                 HTableInterface systemTable = env.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
                 MetaDataProtocol mdProxy = systemTable.coprocessorProxy(MetaDataProtocol.class, indexTableKey);
@@ -103,13 +104,14 @@ public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy {
                 MetaDataMutationResult result = mdProxy.updateIndexState(tableMetadata);
                 if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                     LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + result.getMutationCode() + ". Will use default failure policy instead.");
-                    super.handleFailure(attempted, cause);
-                    throw new DoNotRetryIOException("Attemp to writes to " + indexTableName + " failed.", cause);
+                    throw new DoNotRetryIOException("Attemp to disable " + indexTableName + " failed.");
                 }
                 LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", cause);
             }
         } catch (Throwable t) {
+            LOG.warn("handleFailure failed", t);
             super.handleFailure(attempted, cause);
+            throw new DoNotRetryIOException("Attemp to writes to " + indexTableName + " failed.", cause);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 39bd21f..340fe96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1353,12 +1353,12 @@ public class MetaDataClient {
                         .setSchemaName(schemaName).setTableName(tableName).build().buildException();
                 default:
                     try {
-                        // TODO: should we update the parent table by removing the index?
-                        connection.removeTable(tenantId, tableName);
+                        connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName));
                     } catch (TableNotFoundException ignore) { } // Ignore - just means wasn't cached
                     
                     // TODO: we need to drop the index data when a view is dropped
                     boolean dropMetaData = connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
+                                        
                     if (result.getTable() != null && tableType != PTableType.VIEW) {
                         connection.setAutoCommit(true);
                         PTable table = result.getTable();
@@ -1402,7 +1402,7 @@ public class MetaDataClient {
         PName tenantId = connection.getTenantId();
         switch (mutationCode) {
         case TABLE_NOT_FOUND:
-            connection.removeTable(tenantId, tableName);
+            connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName));
             throw new TableNotFoundException(schemaName, tableName);
         case UNALLOWED_TABLE_MUTATION:
             String columnName = null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index e348a4c..74b6c41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -116,6 +116,8 @@ public class PTableImpl implements PTable {
     private int estimatedSize;
     
     public PTableImpl() {
+        this.indexes = Collections.emptyList();
+        this.physicalNames = Collections.emptyList();
     }
 
     public PTableImpl(PName tenantId, String schemaName, String tableName, long timestamp, List<PColumnFamily> families) { // For base table of mapped VIEW
@@ -138,7 +140,7 @@ public class PTableImpl implements PTable {
             familyByString.put(family.getName().getString(), family);
         }
         this.families = families;
-        this.physicalNames = Collections.emptyList();;
+        this.physicalNames = Collections.emptyList();
     }
 
     public PTableImpl(long timeStamp) { // For delete marker
@@ -938,4 +940,4 @@ public class PTableImpl implements PTable {
     public PTableKey getKey() {
         return key;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
index e820bb3..e58bb38 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hbase.HConstants;
 
 
 public final class TableRef {
-    private final PTable table;
+    private PTable table;
     private final String alias;
     private final long upperBoundTimeStamp;
     private final long lowerBoundTimeStamp;
@@ -56,6 +56,10 @@ public final class TableRef {
     public PTable getTable() {
         return table;
     }
+    
+    public void setTable(PTable value) {
+        this.table = value;
+    }
 
     public String getTableAlias() {
         return alias;