You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jy...@apache.org on 2014/10/06 20:20:18 UTC

git commit: PHOENIX-1289 Drop index during upsert may abort RS (daniel meng + jyates)

Repository: phoenix
Updated Branches:
  refs/heads/master 909d97596 -> faeab9355


PHOENIX-1289 Drop index during upsert may abort RS (daniel meng + jyates)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/faeab935
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/faeab935
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/faeab935

Branch: refs/heads/master
Commit: faeab935554404a042285a01127e9b88b8e3a47c
Parents: 909d975
Author: Jesse Yates <jy...@apache.org>
Authored: Mon Oct 6 10:58:14 2014 -0700
Committer: Jesse Yates <jy...@apache.org>
Committed: Mon Oct 6 11:04:58 2014 -0700

----------------------------------------------------------------------
 .../end2end/index/DropIndexDuringUpsertIT.java  | 177 ++++++++++++++
 .../index/write/KillServerOnFailurePolicy.java  |   2 +-
 .../index/PhoenixIndexFailurePolicy.java        | 239 +++++++++++--------
 3 files changed, 316 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/faeab935/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
new file mode 100644
index 0000000..4e44ec8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class DropIndexDuringUpsertIT extends BaseTest {
+    private static final int NUM_SLAVES = 4;
+    private static String url;
+    private static PhoenixTestDriver driver;
+    private static HBaseTestingUtility util;
+
+    private static ExecutorService service = Executors.newCachedThreadPool();
+
+    private static final String SCHEMA_NAME = "S";
+    private static final String INDEX_TABLE_NAME = "I";
+    private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
+    private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I");
+
+    @Before
+    public void doSetup() throws Exception {
+        Configuration conf = HBaseConfiguration.create();
+        setUpConfigForMiniCluster(conf);
+        conf.setInt("hbase.client.retries.number", 2);
+        conf.setInt("hbase.client.pause", 5000);
+        conf.setInt("hbase.balancer.period", Integer.MAX_VALUE);
+        conf.setLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 0);
+        util = new HBaseTestingUtility(conf);
+        util.startMiniCluster(NUM_SLAVES);
+        String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
+        url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
+                + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // Must update config before starting server
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            destroyDriver(driver);
+        } finally {
+            util.shutdownMiniCluster();
+        }
+    }
+
+    @Test(timeout = 300000)
+    public void testWriteFailureDropIndex() throws Exception {
+        String query;
+        ResultSet rs;
+
+        // create the table and ensure its empty
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = driver.connect(url, props);
+        conn.createStatement().execute(
+                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
+        rs = conn.createStatement().executeQuery(query);
+        assertFalse(rs.next());
+
+        // create the index and ensure its empty as well
+        conn.createStatement().execute(
+                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+        rs = conn.createStatement().executeQuery(query);
+        assertFalse(rs.next());
+
+        // Verify the metadata for index is correct.
+        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+                new String[] { PTableType.INDEX.toString() });
+        assertTrue(rs.next());
+        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
+        assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+        assertFalse(rs.next());
+
+        // do an upsert on a separate thread
+        Future<Boolean> future = service.submit(new UpsertTask());
+        Thread.sleep(500);
+
+        // at the same time, drop the index table
+        conn.createStatement().execute("drop index " + INDEX_TABLE_NAME + " on " + DATA_TABLE_FULL_NAME);
+
+        // verify index is dropped
+        query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME;
+        try {
+            conn.createStatement().executeQuery(query);
+            fail();
+        } catch (SQLException e) {
+        }
+
+        // assert {@KillServerOnFailurePolicy} is not triggered
+        assertTrue(future.get());
+    }
+
+    private static class UpsertTask implements Callable<Boolean> {
+
+        private Connection conn = null;
+
+        public UpsertTask() throws SQLException {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            conn = driver.connect(url, props);
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+            for (int i = 0; i < 500; i++) {
+                stmt.setString(1, "a");
+                stmt.setString(2, "x");
+                stmt.setString(3, Integer.toString(i));
+                stmt.execute();
+                conn.commit();
+            }
+            return true;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/faeab935/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java
index 0b84cdf..2fb43b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java
@@ -61,7 +61,7 @@ public class KillServerOnFailurePolicy implements IndexFailurePolicy {
 
   @Override
   public void
-      handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
+      handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause){
     // cleanup resources
     this.stop("Killing ourselves because of an error:" + cause);
     // notify the regionserver of the failure

http://git-wip-us.apache.org/repos/asf/phoenix/blob/faeab935/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index b683c20..565b28c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -19,13 +19,7 @@ package org.apache.phoenix.index;
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -89,91 +83,79 @@ public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy {
       this.env = env;
     }
 
+    /**
+     * Attempt to disable the index table when we can't write to it, preventing future updates until the index is
+     * brought up to date, but allowing historical reads to continue until then.
+     * <p>
+     * In the case that we cannot reach the metadata information, we will fall back to the default policy and kill
+     * this server, so we can attempt to replay the edits on restart.
+     * </p>
+     * @param attempted the mutations that were attempted to be written and the tables to which they were written
+     * @param cause root cause of the failure
+     */
     @Override
-    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
-        Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
-        List<String> indexTableNames = new ArrayList<String>(1);
+    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) {
+
         try {
-            for (HTableInterfaceReference ref : refs) {
-                long minTimeStamp = 0;
-                Collection<Mutation> mutations = attempted.get(ref);
-                if (mutations != null) {
-                  for (Mutation m : mutations) {
+            handleFailureWithExceptions(attempted, cause);
+        } catch (Throwable t) {
+            LOG.warn("handleFailure failed", t);
+            super.handleFailure(attempted, cause);
+        }
+    }
+
+    private void handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted,
+            Exception cause) throws Throwable {
+        Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
+        Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size());
+        // start by looking at all the tables to which we attempted to write
+        for (HTableInterfaceReference ref : refs) {
+            long minTimeStamp = 0;
+
+            // get the minimum timestamp across all the mutations we attempted on that table
+            Collection<Mutation> mutations = attempted.get(ref);
+            if (mutations != null) {
+                for (Mutation m : mutations) {
                     for (List<Cell> kvs : m.getFamilyCellMap().values()) {
-                      for (Cell kv : kvs) {
-                        if (minTimeStamp == 0 || (kv.getTimestamp() >=0 && minTimeStamp < kv.getTimestamp())) {
-                          minTimeStamp = kv.getTimestamp();
-                        }
-                      }
-                    }
-                  }
-                }
-                
-                if(ref.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) {
-                    PhoenixConnection conn = null;
-                    try {
-                        conn = QueryUtil.getConnection(this.env.getConfiguration()).unwrap(
-                                    PhoenixConnection.class);
-                        String userTableName = MetaDataUtil.getUserTableName(ref.getTableName());
-                        PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
-                        List<PTable> indexes = dataTable.getIndexes();
-                        // local index used to get view id from index mutation row key.
-                        PTable localIndex = null;
-                        Map<ImmutableBytesWritable, String> localIndexNames =
-                                new HashMap<ImmutableBytesWritable, String>();
-                        for (PTable index : indexes) {
-                            if (index.getIndexType() == IndexType.LOCAL
-                                    && index.getIndexState() == PIndexState.ACTIVE) {
-                                if (localIndex == null) localIndex = index;
-                                localIndexNames.put(new ImmutableBytesWritable(MetaDataUtil.getViewIndexIdDataType().toBytes(
-                                    index.getViewIndexId())),index.getName().getString());
-                            }
-                        }
-                        if(localIndex == null) continue;
-                        
-                        IndexMaintainer indexMaintainer = localIndex.getIndexMaintainer(dataTable);
-                        HRegionInfo regionInfo = this.env.getRegion().getRegionInfo();
-                        int offset =
-                                regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length
-                                        : regionInfo.getStartKey().length;
-                        byte[] viewId = null;
-                        for (Mutation mutation : mutations) {
-                            viewId = indexMaintainer.getViewIndexIdFromIndexRowKey(new ImmutableBytesWritable(mutation.getRow(), offset, mutation.getRow().length - offset));
-                            String indexTableName = localIndexNames.get(new ImmutableBytesWritable(viewId)); 
-                            if(!indexTableNames.contains(indexTableName)) {
-                                indexTableNames.add(indexTableName);
-                            }
-                        }
-                    } catch (ClassNotFoundException e) {
-                        throw new IOException(e);
-                    } catch (SQLException e) {
-                        throw new IOException(e);
-                    } finally {
-                        if (conn != null) {
-                            try {
-                                conn.close();
-                            } catch (SQLException e) {
-                                throw new IOException(e);
+                        for (Cell kv : kvs) {
+                            if (minTimeStamp == 0 || (kv.getTimestamp() >= 0 && minTimeStamp < kv.getTimestamp())) {
+                                minTimeStamp = kv.getTimestamp();
                             }
                         }
                     }
-                } else {
-                    indexTableNames.add(ref.getTableName());
                 }
+            }
+
+            // its a local index table, so we need to convert it to the index table names we should disable
+            if (ref.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) {
+                for (String tableName : getLocalIndexNames(ref, mutations)) {
+                    indexTableNames.put(tableName, minTimeStamp);
+                }
+            } else {
+                indexTableNames.put(ref.getTableName(), minTimeStamp);
+            }
+        }
+
+        // for all the index tables that we've found, try to disable them and if that fails, try to
+        for (Map.Entry<String, Long> tableTimeElement :indexTableNames.entrySet()){
+            String indexTableName = tableTimeElement.getKey();
+            long minTimeStamp = tableTimeElement.getValue();
+            // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
+            byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
+            HTableInterface
+                    systemTable =
+                    env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES));
+            // Mimic the Put that gets generated by the client on an update of the index state
+            Put put = new Put(indexTableKey);
+            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+                    PIndexState.DISABLE.getSerializedBytes());
+            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
+                    PDataType.LONG.toBytes(minTimeStamp));
+            final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put);
 
-                for (String indexTableName : indexTableNames) {
-                    // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
-                    byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
-                    HTableInterface systemTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES));
-                    // Mimic the Put that gets generated by the client on an update of the index state
-                    Put put = new Put(indexTableKey);
-                    put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, PIndexState.DISABLE.getSerializedBytes());
-                    put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PDataType.LONG.toBytes(minTimeStamp));
-                    final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put);
-                    
-                    final Map<byte[], MetaDataResponse> results = 
-                            systemTable.coprocessorService(MetaDataService.class, indexTableKey, indexTableKey,
-                                new Batch.Call<MetaDataService, MetaDataResponse>() {
+            final Map<byte[], MetaDataResponse> results =
+                    systemTable.coprocessorService(MetaDataService.class, indexTableKey, indexTableKey,
+                            new Batch.Call<MetaDataService, MetaDataResponse>() {
                                 @Override
                                 public MetaDataResponse call(MetaDataService instance) throws IOException {
                                     ServerRpcController controller = new ServerRpcController();
@@ -185,30 +167,85 @@ public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy {
                                         builder.addTableMetadataMutations(mp.toByteString());
                                     }
                                     instance.updateIndexState(controller, builder.build(), rpcCallback);
-                                    if(controller.getFailedOn() != null) {
+                                    if (controller.getFailedOn() != null) {
                                         throw controller.getFailedOn();
                                     }
                                     return rpcCallback.get();
                                 }
                             });
-                    if(results.isEmpty()){
-                        throw new IOException("Didn't get expected result size");
-                    }
-                    MetaDataResponse tmpResponse = results.values().iterator().next();
-                    MetaDataMutationResult result = MetaDataMutationResult.constructFromProto(tmpResponse);                
-                
-                    if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
-                        LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + result.getMutationCode() + ". Will use default failure policy instead.");
-                        throw new DoNotRetryIOException("Attemp to disable " + indexTableName + " failed.");
-                    }
-                    LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", cause);
-                }
+            if (results.isEmpty()) {
+                throw new IOException("Didn't get expected result size");
             }
-        } catch (Throwable t) {
-            LOG.warn("handleFailure failed", t);
-            super.handleFailure(attempted, cause);
-            throw new DoNotRetryIOException("Attemp to writes to " + indexTableNames + " failed.", cause);
+            MetaDataResponse tmpResponse = results.values().iterator().next();
+            MetaDataMutationResult result = MetaDataMutationResult.constructFromProto(tmpResponse);
+
+            if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
+                LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations");
+                continue;
+            }
+            if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
+                LOG.warn("Attempt to disable index " + indexTableName + " failed with code = "
+                        + result.getMutationCode() + ". Will use default failure policy instead.");
+                throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
+            }
+            LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.",
+                    cause);
         }
     }
 
+    private Collection<? extends String> getLocalIndexNames(HTableInterfaceReference ref,
+            Collection<Mutation> mutations) throws IOException {
+        Set<String> indexTableNames = new HashSet<String>(1);
+        PhoenixConnection conn = null;
+        try {
+            conn = QueryUtil.getConnection(this.env.getConfiguration()).unwrap(
+                    PhoenixConnection.class);
+            String userTableName = MetaDataUtil.getUserTableName(ref.getTableName());
+            PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
+            List<PTable> indexes = dataTable.getIndexes();
+            // local index used to get view id from index mutation row key.
+            PTable localIndex = null;
+            Map<ImmutableBytesWritable, String> localIndexNames =
+                    new HashMap<ImmutableBytesWritable, String>();
+            for (PTable index : indexes) {
+                if (index.getIndexType() == IndexType.LOCAL
+                        && index.getIndexState() == PIndexState.ACTIVE) {
+                    if (localIndex == null) localIndex = index;
+                    localIndexNames.put(new ImmutableBytesWritable(MetaDataUtil.getViewIndexIdDataType().toBytes(
+                            index.getViewIndexId())), index.getName().getString());
+                }
+            }
+            if (localIndex == null) {
+                return Collections.emptySet();
+            }
+
+            IndexMaintainer indexMaintainer = localIndex.getIndexMaintainer(dataTable);
+            HRegionInfo regionInfo = this.env.getRegion().getRegionInfo();
+            int offset =
+                    regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length
+                            : regionInfo.getStartKey().length;
+            byte[] viewId = null;
+            for (Mutation mutation : mutations) {
+                viewId =
+                        indexMaintainer.getViewIndexIdFromIndexRowKey(
+                                new ImmutableBytesWritable(mutation.getRow(), offset,
+                                        mutation.getRow().length - offset));
+                String indexTableName = localIndexNames.get(new ImmutableBytesWritable(viewId));
+                indexTableNames.add(indexTableName);
+            }
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        } finally {
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    throw new IOException(e);
+                }
+            }
+        }
+        return indexTableNames;
+    }
 }