You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/12 01:22:39 UTC

[1/2] PHOENIX-1058 Support index region split on it's corresponding data region split (Rajeshbabu)

Repository: phoenix
Updated Branches:
  refs/heads/4.0 2ed929a75 -> 4fa6146b3


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index c8c031b..68cdb26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -525,6 +525,38 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
     }
     
+    /*
+     * return the view index id from the index row key
+     */
+    public byte[] getViewIndexIdFromIndexRowKey(ImmutableBytesWritable indexRowKeyPtr) {
+        assert(isLocalIndex);
+        RowKeySchema indexRowKeySchema = getIndexRowKeySchema();
+        // TODO add logic to skip region start key as well because we cannot find the region startkey in indexhalfstorefilereader.
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        TrustedByteArrayOutputStream stream =
+                new TrustedByteArrayOutputStream(estimatedIndexRowKeyBytes);
+        DataOutput output = new DataOutputStream(stream);
+        try {
+            int indexPosOffset = (!isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0) + (isMultiTenant ? 1 : 0) + (viewIndexId == null ? 0 : 1);
+            Boolean hasValue =
+                    indexRowKeySchema.iterator(indexRowKeyPtr, ptr, indexPosOffset);
+            if (Boolean.TRUE.equals(hasValue)) {
+                    output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
+            }
+            int length = stream.size();
+            byte[] dataRowKey = stream.getBuffer();
+            return dataRowKey.length == length ? dataRowKey : Arrays.copyOf(dataRowKey, length);
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
+            }
+        }
+    }
+
     private volatile RowKeySchema indexRowKeySchema;
     
     // We have enough information to generate the index row key schema

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LocalIndexParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LocalIndexParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LocalIndexParallelIteratorRegionSplitter.java
index c3a38d5..0ccd738 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LocalIndexParallelIteratorRegionSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LocalIndexParallelIteratorRegionSplitter.java
@@ -37,6 +37,7 @@ public class LocalIndexParallelIteratorRegionSplitter extends DefaultParallelIte
 
     @Override
     protected List<HRegionLocation> getAllRegions() throws SQLException {
+        context.getConnection().getQueryServices().clearTableRegionCache(tableRef.getTable().getPhysicalName().getBytes());
         return context.getConnection().getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes());
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index ee0be95..4965813 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -59,6 +59,8 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
+import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -620,6 +622,21 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts);
             }
             
+            if (descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null
+                    && Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(descriptor
+                            .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
+                if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) {
+                    descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
+                        null, 1, null);
+                }
+            } else {
+                if (!descriptor.hasCoprocessor(LocalIndexSplitter.class.getName())
+                        && !SchemaUtil.isMetaTable(tableName)
+                        && !SchemaUtil.isSequenceTable(tableName)) {
+                    descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, 1, null);
+                }
+            }
+
             // Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table
             // stay on the same region.
             if (SchemaUtil.isMetaTable(tableName)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 2147026..c57b555 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -153,7 +153,7 @@ import com.google.common.collect.Sets;
 public abstract class BaseTest {
     private static final Map<String,String> tableDDLMap;
     private static Logger logger = Logger.getLogger("BaseTest.class");
-    
+    private static HBaseTestingUtility utility = null; 
     static {
         ImmutableMap.Builder<String,String> builder = ImmutableMap.builder();
         builder.put(ENTITY_HISTORY_TABLE_NAME,"create table " + ENTITY_HISTORY_TABLE_NAME +
@@ -473,7 +473,7 @@ public abstract class BaseTest {
      */
     private static String initMiniCluster(Configuration conf) {
         setUpConfigForMiniCluster(conf);
-        final HBaseTestingUtility utility = new HBaseTestingUtility(conf);
+        utility = new HBaseTestingUtility(conf);
         try {
             utility.startMiniCluster();
             String clientPort = utility.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
@@ -504,6 +504,7 @@ public abstract class BaseTest {
         setTestConfigForDistribuedCluster(conf);
         try {
             IntegrationTestingUtility util =  new IntegrationTestingUtility(conf);
+            utility = util;
             util.initializeCluster(NUM_SLAVES_BASE);
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -1285,4 +1286,8 @@ public abstract class BaseTest {
         assertTrue("Could not find " + errorResult + " in expected results: " + expectedResults + " with actual results: " + actualResults, errorResult == null);
         assertEquals(count, expectedCount);
     }
+    
+    public HBaseTestingUtility getUtility() {
+        return utility;
+    }
 }


[2/2] git commit: PHOENIX-1058 Support index region split on it's corresponding data region split (Rajeshbabu)

Posted by ja...@apache.org.
PHOENIX-1058 Support index region split on it's corresponding data region split (Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4fa6146b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4fa6146b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4fa6146b

Branch: refs/heads/4.0
Commit: 4fa6146b3a2b0352f2b71e176a36d08499307438
Parents: 2ed929a
Author: Rajeshbabu Chintaguntla <ra...@huawei.com>
Authored: Mon Aug 11 23:53:34 2014 +0530
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Aug 11 16:26:01 2014 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/BaseQueryIT.java |   6 +
 .../org/apache/phoenix/end2end/QueryIT.java     |  24 +-
 .../phoenix/end2end/index/LocalIndexIT.java     | 110 +++
 .../regionserver/IndexHalfStoreFileReader.java  | 458 +++++++++
 .../IndexHalfStoreFileReaderGenerator.java      | 159 +++
 .../regionserver/IndexSplitTransaction.java     | 974 +++++++++++++++++++
 .../hbase/regionserver/LocalIndexSplitter.java  | 101 ++
 .../apache/phoenix/index/IndexMaintainer.java   |  32 +
 ...ocalIndexParallelIteratorRegionSplitter.java |   1 +
 .../query/ConnectionQueryServicesImpl.java      |  17 +
 .../java/org/apache/phoenix/query/BaseTest.java |   9 +-
 11 files changed, 1876 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
index d736612..4263dd2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
@@ -113,6 +113,12 @@ public abstract class BaseQueryIT extends BaseClientManagedTimeIT {
                 + "    B_STRING, " + "    A_DATE)" });
         testCases.add(new String[] { "CREATE INDEX " + ATABLE_INDEX_NAME + " ON aTable (a_integer) INCLUDE ("
                 + "    A_STRING, " + "    B_STRING, " + "    A_DATE)" });
+        testCases.add(new String[] { "CREATE LOCAL INDEX " + ATABLE_INDEX_NAME + " ON aTable (a_integer DESC) INCLUDE ("
+                + "    A_STRING, " + "    B_STRING, " + "    A_DATE)" });
+        testCases.add(new String[] { "CREATE LOCAL INDEX " + ATABLE_INDEX_NAME + " ON aTable (a_integer, a_string) INCLUDE ("
+                + "    B_STRING, " + "    A_DATE)" });
+        testCases.add(new String[] { "CREATE LOCAL INDEX " + ATABLE_INDEX_NAME + " ON aTable (a_integer) INCLUDE ("
+                + "    A_STRING, " + "    B_STRING, " + "    A_DATE)" });
         testCases.add(new String[] { "" });
         return testCases;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
index d9c3862..35140f4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
@@ -739,19 +739,17 @@ public class QueryIT extends BaseQueryIT {
             
             byte[] tableName = Bytes.toBytes(ATABLE_NAME);
             admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-            if (admin.tableExists(TableName.valueOf(MetaDataUtil.getLocalIndexTableName("atable")))) {
-                HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName);
-                htable.clearRegionCache();
-                int nRegions = htable.getRegionLocations().size();
-                admin.split(tableName, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A" + Character.valueOf((char) ('3' + nextRunCount())) + ts))); // vary split point with test run
-                int retryCount = 0;
-                do {
-                    Thread.sleep(2000);
-                    retryCount++;
-                    //htable.clearRegionCache();
-                } while (retryCount < 10 && htable.getRegionLocations().size() == nRegions);
-                assertNotEquals(nRegions, htable.getRegionLocations().size());
-            }
+            HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName);
+            htable.clearRegionCache();
+            int nRegions = htable.getRegionLocations().size();
+            admin.split(tableName, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A" + Character.valueOf((char) ('3' + nextRunCount())) + ts))); // vary split point with test run
+            int retryCount = 0;
+            do {
+                Thread.sleep(2000);
+                retryCount++;
+                //htable.clearRegionCache();
+            } while (retryCount < 10 && htable.getRegionLocations().size() == nRegions);
+            assertNotEquals(nRegions, htable.getRegionLocations().size());
             
             statement.setString(1, tenantId);
             rs = statement.executeQuery();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 28eddc2..a291a37 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -27,8 +27,10 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -36,6 +38,8 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -48,6 +52,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -572,4 +577,109 @@ public class LocalIndexIT extends BaseIndexIT {
             conn1.close();
         }
     }
+
+    @Test
+    public void testLocalIndexScanWithInList() throws Exception {
+        createBaseTable(DATA_TABLE_NAME, null, "('e','i','o')");
+        Connection conn1 = DriverManager.getConnection(getUrl());
+        try{
+            conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('b',1,2,4,'z')");
+            conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('f',1,2,3,'a')");
+            conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('j',2,4,2,'a')");
+            conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('q',3,1,1,'c')");
+            conn1.commit();
+            conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1) include (k3)");
+            
+            ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + INDEX_TABLE_NAME);
+            assertTrue(rs.next());
+            
+            HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+            
+            String query = "SELECT t_id FROM " + DATA_TABLE_NAME +" where (v1,k3) IN (('z',4),('a',2))";
+            rs = conn1.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("j", rs.getString("t_id"));
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString("t_id"));
+            assertFalse(rs.next());
+       } finally {
+            conn1.close();
+        }
+    }
+
+    @Test
+    public void testLocalIndexScanAfterRegionSplit() throws Exception {
+        createBaseTable(DATA_TABLE_NAME, null, "('e','j','o')");
+        Connection conn1 = DriverManager.getConnection(getUrl());
+        try{
+            String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"};
+            for (int i = 0; i < 26; i++) {
+                conn1.createStatement().execute(
+                    "UPSERT INTO " + DATA_TABLE_NAME + " values('"+strings[i]+"'," + i + ","
+                            + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+            }
+            conn1.commit();
+            conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)");
+            conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + "_2 ON " + DATA_TABLE_NAME + "(k3)");
+
+            ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_NAME);
+            assertTrue(rs.next());
+            
+            HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+            HMaster master = getUtility().getHBaseCluster().getMaster();
+            for (int i = 1; i < 5; i++) {
+                
+                admin.split(Bytes.toBytes(DATA_TABLE_NAME), ByteUtil.concat(Bytes.toBytes(strings[3*i])));
+                List<HRegionInfo> regionsOfUserTable =
+                        master.getAssignmentManager().getRegionStates().getRegionsOfTable(TableName.valueOf(DATA_TABLE_NAME));
+
+                while (regionsOfUserTable.size() != (4+i)) {
+                    Thread.sleep(100);
+                    regionsOfUserTable = master.getAssignmentManager().getRegionStates().getRegionsOfTable(TableName.valueOf(DATA_TABLE_NAME));
+                }
+                assertEquals(4+i, regionsOfUserTable.size());
+                List<HRegionInfo> regionsOfIndexTable = master.getAssignmentManager().getRegionStates()
+                                .getRegionsOfTable(TableName.valueOf(MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)));
+                while (regionsOfIndexTable.size() != (4+i)) {
+                    Thread.sleep(100);
+                    regionsOfIndexTable = master.getAssignmentManager().getRegionStates()
+                            .getRegionsOfTable(TableName.valueOf(MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)));
+                }
+                assertEquals(4 + i, regionsOfIndexTable.size());
+                String query = "SELECT t_id,k1,v1 FROM " + DATA_TABLE_NAME;
+                rs = conn1.createStatement().executeQuery("EXPLAIN "+query);
+                assertEquals(
+                    "CLIENT PARALLEL " + (4+i) + "-WAY RANGE SCAN OVER "
+                            + MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)+" [-32768]\n"+
+                            "CLIENT MERGE SORT",
+                    QueryUtil.getExplainPlan(rs));
+                rs = conn1.createStatement().executeQuery(query);
+                Thread.sleep(1000);
+                for (int j = 0; j < 26; j++) {
+                    assertTrue(rs.next());
+                    assertEquals(strings[25-j], rs.getString("t_id"));
+                    assertEquals(25-j, rs.getInt("k1"));
+                    assertEquals(strings[j], rs.getString("V1"));
+                }
+                
+                query = "SELECT t_id,k1,k3 FROM " + DATA_TABLE_NAME;
+                rs = conn1.createStatement().executeQuery("EXPLAIN "+query);
+                assertEquals(
+                    "CLIENT PARALLEL " + (4+i) + "-WAY RANGE SCAN OVER "
+                            + MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)+" [-32767]\n"+
+                            "CLIENT MERGE SORT",
+                    QueryUtil.getExplainPlan(rs));
+                rs = conn1.createStatement().executeQuery(query);
+                Thread.sleep(1000);
+                for (int j = 0; j < 26; j++) {
+                    assertTrue(rs.next());
+                    assertEquals(strings[j], rs.getString("t_id"));
+                    assertEquals(j, rs.getInt("k1"));
+                    assertEquals(j+2, rs.getInt("k3"));
+                }
+            }
+       } finally {
+            conn1.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
new file mode 100644
index 0000000..d8650cf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.index.IndexMaintainer;
+
+/**
+ * A facade for a {@link org.apache.hadoop.hbase.io.hfile.HFile.Reader} that serves up either the
+ * top or bottom half of a HFile where 'bottom' is the first half of the file containing the keys
+ * that sort lowest and 'top' is the second half of the file with keys that sort greater than those
+ * of the bottom half. The top includes the split files midkey, of the key that follows if it does
+ * not exist in the file.
+ * 
+ * <p>
+ * This type works in tandem with the {@link Reference} type. This class is used reading while
+ * Reference is used writing.
+ * 
+ * <p>
+ * This file is not splitable. Calls to {@link #midkey()} return null.
+ */
+
+public class IndexHalfStoreFileReader extends StoreFile.Reader {
+    private static final int ROW_KEY_LENGTH = 2;
+    private final boolean top;
+    // This is the key we split around. Its the first possible entry on a row:
+    // i.e. empty column and a timestamp of LATEST_TIMESTAMP.
+    private final byte[] splitkey;
+    private final byte[] splitRow;
+    private final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers;
+    private final byte[][] viewConstants; 
+    private final int offset;
+    private final HRegionInfo regionInfo;
+    private final HRegionInfo parent;
+
+    /**
+     * @param p
+     * @param cacheConf
+     * @param r
+     * @throws IOException
+     */
+    public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
+            final Reference r, final Configuration conf,
+            final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
+            final byte[][] viewConstants, final HRegionInfo regionInfo,
+            final HRegionInfo parent) throws IOException {
+        super(fs, p, cacheConf, conf);
+        this.splitkey = r.getSplitKey();
+        // Is it top or bottom half?
+        this.top = Reference.isTopFileRegion(r.getFileRegion());
+        this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey));
+        this.indexMaintainers = indexMaintainers;
+        this.viewConstants = viewConstants;
+        this.regionInfo = regionInfo;
+        this.parent = parent;
+        this.offset =
+                parent.getStartKey().length != 0 ? parent.getStartKey().length
+                        : parent.getEndKey().length;
+    }
+
+    /**
+     * @param p
+     * @param cacheConf
+     * @param r
+     * @throws IOException
+     */
+    public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
+            final FSDataInputStreamWrapper in, long size, final Reference r,
+            final Configuration conf,
+            final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
+            final byte[][] viewConstants, final HRegionInfo regionInfo, final HRegionInfo parent)
+            throws IOException {
+        super(fs, p, in, size, cacheConf, conf);
+        this.splitkey = r.getSplitKey();
+        // Is it top or bottom half?
+        this.top = Reference.isTopFileRegion(r.getFileRegion());
+        this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey));
+        this.indexMaintainers = indexMaintainers;
+        this.viewConstants = viewConstants;
+        this.regionInfo = regionInfo;
+        this.parent = parent;
+        this.offset =
+                parent.getStartKey().length != 0 ? parent.getStartKey().length
+                        : parent.getEndKey().length;
+    }
+
+    protected boolean isTop() {
+        return this.top;
+    }
+
+    @Override
+    public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread,
+            final boolean isCompaction) {
+        final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction);
+        return new HFileScanner() {
+            final HFileScanner delegate = s;
+            public boolean atEnd = false;
+
+            public ByteBuffer getKey() {
+                if (atEnd) {
+                    return null;
+                }
+                boolean changeBottomKeys =
+                        regionInfo.getStartKey().length == 0 && splitRow.length != offset;
+                if (!top) {
+                    // For first region we are prepending empty byte array of length region end key.
+                    // So if split row length is not equal to region end key length then we need to
+                    // replace empty bytes of split row length. Because after split end key is the split
+                    // row.
+                    if(!changeBottomKeys) return delegate.getKey();
+                }
+                // If it is top store file replace the StartKey of the Key with SplitKey
+                return getChangedKey(delegate.getKeyValue(), changeBottomKeys);
+            }
+            
+            private ByteBuffer getChangedKey(KeyValue kv, boolean changeBottomKeys) {
+                // new KeyValue(row, family, qualifier, timestamp, type, value)
+                byte[] newRowkey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
+                KeyValue newKv =
+                        new KeyValue(newRowkey, 0, newRowkey.length, kv.getFamilyArray(),
+                                kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
+                                kv.getQualifierOffset(), kv.getQualifierLength(),
+                                kv.getTimestamp(), Type.codeToType(kv.getTypeByte()), null, 0, 0);
+                ByteBuffer keyBuffer = ByteBuffer.wrap(newKv.getKey());
+                return keyBuffer;
+            }
+
+            private byte[] getNewRowkeyByRegionStartKeyReplacedWithSplitKey(KeyValue kv, boolean changeBottomKeys) {
+                int lenOfRemainingKey = kv.getRowLength() - offset;
+                byte[] keyReplacedStartKey = new byte[lenOfRemainingKey + splitRow.length];
+                System.arraycopy(changeBottomKeys ? new byte[splitRow.length] : splitRow, 0,
+                    keyReplacedStartKey, 0, splitRow.length);
+                System.arraycopy(kv.getRowArray(), kv.getRowOffset() + offset, keyReplacedStartKey,
+                    splitRow.length, lenOfRemainingKey);
+                return keyReplacedStartKey;
+            }
+
+            public String getKeyString() {
+                if (atEnd) {
+                    return null;
+                }
+                return Bytes.toStringBinary(getKey());
+            }
+
+            public ByteBuffer getValue() {
+                if (atEnd) {
+                    return null;
+                }
+                return delegate.getValue();
+            }
+
+            public String getValueString() {
+                if (atEnd) {
+                    return null;
+                }
+                return Bytes.toStringBinary(getValue());
+            }
+
+            public KeyValue getKeyValue() {
+                if (atEnd) {
+                    return null;
+                }
+                KeyValue kv = delegate.getKeyValue();
+                boolean changeBottomKeys =
+                        regionInfo.getStartKey().length == 0 && splitRow.length != offset;
+                if (!top) {
+                    if(!changeBottomKeys) return kv;
+                }
+                // If it is a top store file change the StartKey with SplitKey in Key
+                // and produce the new value corresponding to the change in key
+                byte[] changedKey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
+                KeyValue changedKv =
+                        new KeyValue(changedKey, 0, changedKey.length, kv.getFamilyArray(),
+                                kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
+                                kv.getQualifierOffset(), kv.getQualifierLength(),
+                                kv.getTimestamp(), Type.codeToType(kv.getTypeByte()),
+                                kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
+                                kv.getTags());
+                return changedKv;
+            }
+
+            public boolean next() throws IOException {
+                if (atEnd) {
+                    return false;
+                }
+                while (true) {
+                    boolean b = delegate.next();
+                    if (!b) {
+                        atEnd = true;
+                        return b;
+                    }
+                    // We need to check whether the current KV pointed by this reader is
+                    // corresponding to
+                    // this split or not.
+                    // In case of top store file if the ActualRowKey >= SplitKey
+                    // In case of bottom store file if the ActualRowKey < Splitkey
+                    if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) {
+                        return true;
+                    }
+                }
+            }
+
+            public boolean seekBefore(byte[] key) throws IOException {
+                return seekBefore(key, 0, key.length);
+            }
+
+            public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
+                if (top) {
+                    byte[] fk = getFirstKey();
+                    // This will be null when the file is empty in which we can not seekBefore to
+                    // any key
+                    if (fk == null) {
+                        return false;
+                    }
+                    if (getComparator().compare(key, offset, length, fk, 0, fk.length) <= 0) {
+                        return false;
+                    }
+                    KeyValue replacedKey = getKeyPresentInHFiles(key);
+                    return this.delegate.seekBefore(replacedKey.getBuffer(),
+                        replacedKey.getKeyOffset(), replacedKey.getKeyLength());
+                } else {
+                    // The equals sign isn't strictly necessary just here to be consistent with
+                    // seekTo
+                    if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
+                        return this.delegate.seekBefore(splitkey, 0, splitkey.length);
+                    }
+                }
+                return this.delegate.seekBefore(key, offset, length);
+            }
+
+            public boolean seekTo() throws IOException {
+                boolean b = delegate.seekTo();
+                if (!b) {
+                    atEnd = true;
+                    return b;
+                }
+                while (true) {
+                    // We need to check the first occurrence of satisfying the condition
+                    // In case of top store file if the ActualRowKey >= SplitKey
+                    // In case of bottom store file if the ActualRowKey < Splitkey
+                    if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) {
+                        return true;
+                    }
+                    b = delegate.next();
+                    if (!b) {
+                        return b;
+                    }
+                }
+            }
+
+            public int seekTo(byte[] key) throws IOException {
+                return seekTo(key, 0, key.length);
+            }
+
+            public int seekTo(byte[] key, int offset, int length) throws IOException {
+                if (top) {
+                    if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
+                        return -1;
+                    }
+                    KeyValue replacedKey = getKeyPresentInHFiles(key);
+
+                    int seekTo =
+                            delegate.seekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(),
+                                replacedKey.getKeyLength());
+                    return seekTo;
+                    /*
+                     * if (seekTo == 0 || seekTo == -1) { return seekTo; } else if (seekTo == 1) {
+                     * boolean next = this.next(); }
+                     */
+                } else {
+                    if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
+                        // we would place the scanner in the second half.
+                        // it might be an error to return false here ever...
+                        boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
+                        if (!res) {
+                            throw new IOException(
+                                    "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)");
+                        }
+                        return 1;
+                    }
+                }
+                return delegate.seekTo(key, offset, length);
+            }
+
+            public int reseekTo(byte[] key) throws IOException {
+                return reseekTo(key, 0, key.length);
+            }
+
+            public int reseekTo(byte[] key, int offset, int length) throws IOException {
+                if (top) {
+                    if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
+                        return -1;
+                    }
+                    KeyValue replacedKey = getKeyPresentInHFiles(key);
+                    return delegate.reseekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(),
+                        replacedKey.getKeyLength());
+                } else {
+                    if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
+                        // we would place the scanner in the second half.
+                        // it might be an error to return false here ever...
+                        boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
+                        if (!res) {
+                            throw new IOException(
+                                    "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)");
+                        }
+                        return 1;
+                    }
+                }
+                return delegate.reseekTo(key, offset, length);
+            }
+
+            public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
+                return this.delegate.getReader();
+            }
+
+            // TODO: Need to change as per IndexHalfStoreFileReader
+            public boolean isSeeked() {
+                return this.delegate.isSeeked();
+            }
+        };
+    }
+
+    private boolean isSatisfiedMidKeyCondition(KeyValue kv) {
+        if (CellUtil.isDelete(kv) && kv.getValueLength() == 0) {
+            // In case of a Delete type KV, let it be going to both the daughter regions.
+            // No problems in doing so. In the correct daughter region where it belongs to, this delete
+            // tomb will really delete a KV. In the other it will just hang around there with no actual
+            // kv coming for which this is a delete tomb. :)
+            return true;
+        }
+        ImmutableBytesWritable rowKey =
+                new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset() + offset,
+                        kv.getRowLength() - offset);
+        Entry<ImmutableBytesWritable, IndexMaintainer> entry = indexMaintainers.entrySet().iterator().next();
+        IndexMaintainer indexMaintainer = entry.getValue();
+        byte[] viewIndexId = indexMaintainer.getViewIndexIdFromIndexRowKey(rowKey);
+        IndexMaintainer actualIndexMaintainer = indexMaintainers.get(new ImmutableBytesWritable(viewIndexId));
+        byte[] dataRowKey = actualIndexMaintainer.buildDataRowKey(rowKey, this.viewConstants);
+        int compareResult = Bytes.compareTo(dataRowKey, splitRow);
+        if (top) {
+            if (compareResult >= 0) {
+                return true;
+            }
+        } else {
+            if (compareResult < 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * In case of top half store, the passed key will be with the start key of the daughter region.
+     * But in the actual HFiles, the key will be with the start key of the old parent region. In
+     * order to make the real seek in the HFiles, we need to build the old key. 
+     * 
+     * The logic here is just replace daughter region start key with parent region start key
+     * in the key part.
+     * 
+     * @param key
+     * 
+     */
+    private KeyValue getKeyPresentInHFiles(byte[] key) {
+        KeyValue keyValue = new KeyValue(key);
+        int rowLength = keyValue.getRowLength();
+        int rowOffset = keyValue.getRowOffset();
+        byte[] parentStartKey =
+                parent.getStartKey().length == 0 ? new byte[parent.getEndKey().length] : parent
+                        .getStartKey();
+        int daughterStartKeyLength =
+                regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length : regionInfo
+                        .getStartKey().length;
+
+        // This comes incase of deletefamily
+        if (top
+                && 0 == keyValue.getValueLength()
+                && keyValue.getTimestamp() == HConstants.LATEST_TIMESTAMP
+                && Bytes.compareTo(keyValue.getRowArray(), keyValue.getRowOffset(),
+                    keyValue.getRowLength(), splitRow, 0, splitRow.length) == 0
+                && keyValue.isDeleteFamily()) {
+            KeyValue createFirstDeleteFamilyOnRow =
+                    KeyValue.createFirstDeleteFamilyOnRow(parentStartKey, keyValue.getFamily());
+            return createFirstDeleteFamilyOnRow;
+        }
+
+        short length = (short) (keyValue.getRowLength() - daughterStartKeyLength + parentStartKey.length);
+        byte[] replacedKey =
+                new byte[length + key.length - (rowOffset + rowLength) + ROW_KEY_LENGTH];
+        System.arraycopy(Bytes.toBytes(length), 0, replacedKey, 0, ROW_KEY_LENGTH);
+        System.arraycopy(parentStartKey, 0, replacedKey, ROW_KEY_LENGTH, parentStartKey.length);
+        System.arraycopy(keyValue.getRowArray(), keyValue.getRowOffset() + daughterStartKeyLength,
+            replacedKey, parentStartKey.length + ROW_KEY_LENGTH, keyValue.getRowLength()
+                    - daughterStartKeyLength);
+        System.arraycopy(key, rowOffset + rowLength, replacedKey,
+            parentStartKey.length + keyValue.getRowLength() - daughterStartKeyLength
+                    + ROW_KEY_LENGTH, key.length - (rowOffset + rowLength));
+        return KeyValue.createKeyValueFromKey(replacedKey);
+    }
+
+    @Override
+    public byte[] getLastKey() {
+        // This method won't get used for the index region. There is no need to call
+        // getClosestRowBefore on the index table. Also this is a split region. Can not be further
+        // split
+        throw new UnsupportedOperationException("Method is not implemented!");
+    }
+
+    @Override
+    public byte[] midkey() throws IOException {
+        // Returns null to indicate file is not splitable.
+        return null;
+    }
+
+    @Override
+    public byte[] getFirstKey() {
+        return super.getFirstKey();
+    }
+
+    @Override
+    public boolean passesKeyRangeFilter(Scan scan) {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
new file mode 100644
index 0000000..b04227f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
+    
+    @Override
+    public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
+            FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
+            Reference r, Reader reader) throws IOException {
+        TableName tableName = ctx.getEnvironment().getRegion().getTableDesc().getTableName();
+        HRegion region = ctx.getEnvironment().getRegion();
+        if (reader == null && r != null) {
+            Scan scan = MetaReader.getScanForTableName(tableName);
+            SingleColumnValueFilter scvf = null;
+            if (Reference.isTopFileRegion(r.getFileRegion())) {
+                scvf = new SingleColumnValueFilter(HConstants.CATALOG_FAMILY,
+                        HConstants.SPLITB_QUALIFIER, CompareOp.EQUAL, region.getRegionInfo().toByteArray());
+                scvf.setFilterIfMissing(true);
+            } else {
+                scvf = new SingleColumnValueFilter(HConstants.CATALOG_FAMILY,
+                        HConstants.SPLITA_QUALIFIER, CompareOp.EQUAL, region.getRegionInfo().toByteArray());
+                scvf.setFilterIfMissing(true);
+            }
+            if(scvf != null) scan.setFilter(scvf);
+            HRegionInfo parentRegion = null;
+            HTable metaTable = null;
+            PhoenixConnection conn = null;
+            try {
+                metaTable = new HTable(ctx.getEnvironment().getConfiguration(), TableName.META_TABLE_NAME);
+                ResultScanner scanner = metaTable.getScanner(scan);
+                Result result = scanner.next();
+                if (result == null || result.isEmpty()) return reader;
+                parentRegion = HRegionInfo.getHRegionInfo(result);
+            } finally {
+                if (metaTable != null) metaTable.close();
+            }
+            try {
+                conn = QueryUtil.getConnection(ctx.getEnvironment().getConfiguration()).unwrap(
+                            PhoenixConnection.class);
+                String userTableName = MetaDataUtil.getUserTableName(tableName.getNameAsString());
+                PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
+                List<PTable> indexes = dataTable.getIndexes();
+                Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers =
+                        new HashMap<ImmutableBytesWritable, IndexMaintainer>();
+                for (PTable index : indexes) {
+                    if (index.getIndexType() == IndexType.LOCAL) {
+                        IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable);
+                        indexMaintainers.put(new ImmutableBytesWritable(MetaDataUtil
+                                .getViewIndexIdDataType().toBytes(index.getViewIndexId())),
+                            indexMaintainer);
+                    }
+                }
+                if(indexMaintainers.isEmpty()) return reader;
+                byte[][] viewConstants = getViewConstants(dataTable);
+                return new IndexHalfStoreFileReader(fs, p, cacheConf, in, size, r, ctx
+                        .getEnvironment().getConfiguration(), indexMaintainers, viewConstants,
+                        region.getRegionInfo(), parentRegion);
+            } catch (ClassNotFoundException e) {
+                throw new IOException(e);
+            } catch (SQLException e) {
+                throw new IOException(e);
+            } finally {
+                if (conn != null) {
+                    try {
+                        conn.close();
+                    } catch (SQLException e) {
+                        throw new IOException(e);
+                    }
+                }
+            }
+        }
+        return reader;
+    }
+    
+    private byte[][] getViewConstants(PTable dataTable) {
+        int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0);
+        byte[][] viewConstants = null;
+        int nViewConstants = 0;
+        if (dataTable.getType() == PTableType.VIEW) {
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            List<PColumn> dataPkColumns = dataTable.getPKColumns();
+            for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
+                PColumn dataPKColumn = dataPkColumns.get(i);
+                if (dataPKColumn.getViewConstant() != null) {
+                    nViewConstants++;
+                }
+            }
+            if (nViewConstants > 0) {
+                viewConstants = new byte[nViewConstants][];
+                int j = 0;
+                for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
+                    PColumn dataPkColumn = dataPkColumns.get(i);
+                    if (dataPkColumn.getViewConstant() != null) {
+                        if (IndexUtil.getViewConstantValue(dataPkColumn, ptr)) {
+                            viewConstants[j++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                        } else {
+                            throw new IllegalStateException();
+                        }
+                    }
+                }
+            }
+        }
+        return viewConstants;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
new file mode 100644
index 0000000..87e7d81
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
@@ -0,0 +1,974 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionTransition;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Executes region split as a "transaction".  Call {@link #prepare()} to setup
+ * the transaction, {@link #execute(Server, RegionServerServices)} to run the
+ * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
+ *
+ * <p>Here is an example of how you would use this class:
+ * <pre>
+ *  SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
+ *  if (!st.prepare()) return;
+ *  try {
+ *    st.execute(server, services);
+ *  } catch (IOException ioe) {
+ *    try {
+ *      st.rollback(server, services);
+ *      return;
+ *    } catch (RuntimeException e) {
+ *      myAbortable.abort("Failed split, abort");
+ *    }
+ *  }
+ * </Pre>
+ * <p>This class is not thread safe.  Caller needs ensure split is run by
+ * one thread only.
+ */
+@InterfaceAudience.Private
+public class IndexSplitTransaction {
+  private static final Log LOG = LogFactory.getLog(IndexSplitTransaction.class);
+
+  /*
+   * Region to split
+   */
+  private final HRegion parent;
+  private HRegionInfo hri_a;
+  private HRegionInfo hri_b;
+  private long fileSplitTimeout = 30000;
+  private int znodeVersion = -1;
+
+  /*
+   * Row to split around
+   */
+  private final byte [] splitrow;
+
+  /**
+   * Types to add to the transaction journal.
+   * Each enum is a step in the split transaction. Used to figure how much
+   * we need to rollback.
+   */
+  enum JournalEntry {
+    /**
+     * Set region as in transition, set it into SPLITTING state.
+     */
+    SET_SPLITTING_IN_ZK,
+    /**
+     * We created the temporary split data directory.
+     */
+    CREATE_SPLIT_DIR,
+    /**
+     * Closed the parent region.
+     */
+    CLOSED_PARENT_REGION,
+    /**
+     * The parent has been taken out of the server's online regions list.
+     */
+    OFFLINED_PARENT,
+    /**
+     * Started in on creation of the first daughter region.
+     */
+    STARTED_REGION_A_CREATION,
+    /**
+     * Started in on the creation of the second daughter region.
+     */
+    STARTED_REGION_B_CREATION,
+    /**
+     * Point of no return.
+     * If we got here, then transaction is not recoverable other than by
+     * crashing out the regionserver.
+     */
+    PONR
+  }
+
+  /*
+   * Journal of how far the split transaction has progressed.
+   */
+  private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
+
+  /**
+   * Constructor
+   * @param r Region to split
+   * @param splitrow Row to split around
+   */
+  public IndexSplitTransaction(final HRegion r, final byte [] splitrow) {
+    this.parent = r;
+    this.splitrow = splitrow;
+  }
+
+  /**
+   * Does checks on split inputs.
+   * @return <code>true</code> if the region is splittable else
+   * <code>false</code> if it is not (e.g. its already closed, etc.).
+   */
+  public boolean prepare() {
+    if (!this.parent.isSplittable()) return false;
+    // Split key can be null if this region is unsplittable; i.e. has refs.
+    if (this.splitrow == null) return false;
+    HRegionInfo hri = this.parent.getRegionInfo();
+    parent.prepareToSplit();
+    // Check splitrow.
+    byte [] startKey = hri.getStartKey();
+    byte [] endKey = hri.getEndKey();
+    if (Bytes.equals(startKey, splitrow) ||
+        !this.parent.getRegionInfo().containsRow(splitrow)) {
+      LOG.info("Split row is not inside region key range or is equal to " +
+          "startkey: " + Bytes.toStringBinary(this.splitrow));
+      return false;
+    }
+    long rid = getDaughterRegionIdTimestamp(hri);
+    this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
+    this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
+    return true;
+  }
+
+  /**
+   * Calculate daughter regionid to use.
+   * @param hri Parent {@link HRegionInfo}
+   * @return Daughter region id (timestamp) to use.
+   */
+  private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
+    long rid = EnvironmentEdgeManager.currentTimeMillis();
+    // Regionid is timestamp.  Can't be less than that of parent else will insert
+    // at wrong location in hbase:meta (See HBASE-710).
+    if (rid < hri.getRegionId()) {
+      LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
+        " but current time here is " + rid);
+      rid = hri.getRegionId() + 1;
+    }
+    return rid;
+  }
+
+  private static IOException closedByOtherException = new IOException(
+      "Failed to close region: already closed by another thread");
+
+  /**
+   * Prepare the regions and region files.
+   * @param server Hosting server instance.  Can be null when testing (won't try
+   * and update in zk if a null server)
+   * @param services Used to online/offline regions.
+   * @throws IOException If thrown, transaction failed.
+   *    Call {@link #rollback(Server, RegionServerServices)}
+   * @return Regions created
+   */
+  /* package */PairOfSameType<HRegion> createDaughters(final Server server,
+      final RegionServerServices services) throws IOException {
+    LOG.info("Starting split of region " + this.parent);
+    if ((server != null && server.isStopped()) ||
+        (services != null && services.isStopping())) {
+      throw new IOException("Server is stopped or stopping");
+    }
+    assert !this.parent.lock.writeLock().isHeldByCurrentThread():
+      "Unsafe to hold write lock while performing RPCs";
+
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().preSplit();
+    }
+
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().preSplit(this.splitrow);
+    }
+
+    // If true, no cluster to write meta edits to or to update znodes in.
+    boolean testing = server == null? true:
+        server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
+    this.fileSplitTimeout = testing ? this.fileSplitTimeout :
+        server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
+          this.fileSplitTimeout);
+
+    PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
+
+    List<Mutation> metaEntries = new ArrayList<Mutation>();
+    if (this.parent.getCoprocessorHost() != null) {
+      if (this.parent.getCoprocessorHost().
+          preSplitBeforePONR(this.splitrow, metaEntries)) {
+        throw new IOException("Coprocessor bypassing region "
+            + this.parent.getRegionNameAsString() + " split.");
+      }
+      try {
+        for (Mutation p : metaEntries) {
+          HRegionInfo.parseRegionName(p.getRow());
+        }
+      } catch (IOException e) {
+        LOG.error("Row key of mutation from coprossor is not parsable as region name."
+            + "Mutations from coprocessor should only for hbase:meta table.");
+        throw e;
+      }
+    }
+
+    // This is the point of no return.  Adding subsequent edits to .META. as we
+    // do below when we do the daughter opens adding each to .META. can fail in
+    // various interesting ways the most interesting of which is a timeout
+    // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
+    // then subsequent failures need to crash out this regionserver; the
+    // server shutdown processing should be able to fix-up the incomplete split.
+    // The offlined parent will have the daughters as extra columns.  If
+    // we leave the daughter regions in place and do not remove them when we
+    // crash out, then they will have their references to the parent in place
+    // still and the server shutdown fixup of .META. will point to these
+    // regions.
+    // We should add PONR JournalEntry before offlineParentInMeta,so even if
+    // OfflineParentInMeta timeout,this will cause regionserver exit,and then
+    // master ServerShutdownHandler will fix daughter & avoid data loss. (See
+    // HBase-4562).
+    this.journal.add(JournalEntry.PONR);
+
+    // Edit parent in meta.  Offlines parent region and adds splita and splitb
+    // as an atomic update. See HBASE-7721. This update to META makes the region
+    // will determine whether the region is split or not in case of failures.
+    // If it is successful, master will roll-forward, if not, master will rollback
+    // and assign the parent region.
+    if (!testing) {
+      if (metaEntries == null || metaEntries.isEmpty()) {
+        MetaEditor.splitRegion(server.getCatalogTracker(),
+            parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
+            daughterRegions.getSecond().getRegionInfo(), server.getServerName());
+      } else {
+        offlineParentInMetaAndputMetaEntries(server.getCatalogTracker(),
+          parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
+              .getSecond().getRegionInfo(), server.getServerName(), metaEntries);
+      }
+    }
+    return daughterRegions;
+  }
+
+  public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
+      final RegionServerServices services, boolean testing) throws IOException {
+    // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
+    // have zookeeper so don't do zk stuff if server or zookeeper is null
+    if (server != null && server.getZooKeeper() != null) {
+      try {
+        createNodeSplitting(server.getZooKeeper(),
+          parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
+      } catch (KeeperException e) {
+        throw new IOException("Failed creating PENDING_SPLIT znode on " +
+          this.parent.getRegionNameAsString(), e);
+      }
+    }
+    this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
+    if (server != null && server.getZooKeeper() != null) {
+      // After creating the split node, wait for master to transition it
+      // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
+      // knows about it and won't transition any region which is splitting.
+      znodeVersion = getZKNode(server, services);
+    }
+
+    this.parent.getRegionFileSystem().createSplitsDir();
+    this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
+
+    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
+    Exception exceptionToThrow = null;
+    try{
+      hstoreFilesToSplit = this.parent.close(false);
+    } catch (Exception e) {
+      exceptionToThrow = e;
+    }
+    if (exceptionToThrow == null && hstoreFilesToSplit == null) {
+      // The region was closed by a concurrent thread.  We can't continue
+      // with the split, instead we must just abandon the split.  If we
+      // reopen or split this could cause problems because the region has
+      // probably already been moved to a different server, or is in the
+      // process of moving to a different server.
+      exceptionToThrow = closedByOtherException;
+    }
+    if (exceptionToThrow != closedByOtherException) {
+      this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
+    }
+    if (exceptionToThrow != null) {
+      if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
+      throw new IOException(exceptionToThrow);
+    }
+    if (!testing) {
+      services.removeFromOnlineRegions(this.parent, null);
+    }
+    this.journal.add(JournalEntry.OFFLINED_PARENT);
+
+    // TODO: If splitStoreFiles were multithreaded would we complete steps in
+    // less elapsed time?  St.Ack 20100920
+    //
+    // splitStoreFiles creates daughter region dirs under the parent splits dir
+    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
+    // clean this up.
+    splitStoreFiles(hstoreFilesToSplit);
+
+    // Log to the journal that we are creating region A, the first daughter
+    // region.  We could fail halfway through.  If we do, we could have left
+    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
+    // add entry to journal BEFORE rather than AFTER the change.
+    this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
+    HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
+
+    // Ditto
+    this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
+    HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
+    return new PairOfSameType<HRegion>(a, b);
+  }
+
+  /**
+   * Perform time consuming opening of the daughter regions.
+   * @param server Hosting server instance.  Can be null when testing (won't try
+   * and update in zk if a null server)
+   * @param services Used to online/offline regions.
+   * @param a first daughter region
+   * @param a second daughter region
+   * @throws IOException If thrown, transaction failed.
+   *          Call {@link #rollback(Server, RegionServerServices)}
+   */
+  /* package */void openDaughters(final Server server,
+      final RegionServerServices services, HRegion a, HRegion b)
+      throws IOException {
+    boolean stopped = server != null && server.isStopped();
+    boolean stopping = services != null && services.isStopping();
+    // TODO: Is this check needed here?
+    if (stopped || stopping) {
+      LOG.info("Not opening daughters " +
+          b.getRegionInfo().getRegionNameAsString() +
+          " and " +
+          a.getRegionInfo().getRegionNameAsString() +
+          " because stopping=" + stopping + ", stopped=" + stopped);
+    } else {
+      // Open daughters in parallel.
+      DaughterOpener aOpener = new DaughterOpener(server, a);
+      DaughterOpener bOpener = new DaughterOpener(server, b);
+      aOpener.start();
+      bOpener.start();
+      try {
+        aOpener.join();
+        bOpener.join();
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+      }
+      if (aOpener.getException() != null) {
+        throw new IOException("Failed " +
+          aOpener.getName(), aOpener.getException());
+      }
+      if (bOpener.getException() != null) {
+        throw new IOException("Failed " +
+          bOpener.getName(), bOpener.getException());
+      }
+      if (services != null) {
+        try {
+          // add 2nd daughter first (see HBASE-4335)
+          services.postOpenDeployTasks(b, server.getCatalogTracker());
+          // Should add it to OnlineRegions
+          services.addToOnlineRegions(b);
+          services.postOpenDeployTasks(a, server.getCatalogTracker());
+          services.addToOnlineRegions(a);
+        } catch (KeeperException ke) {
+          throw new IOException(ke);
+        }
+      }
+    }
+  }
+
+  /**
+   * Finish off split transaction, transition the zknode
+   * @param server Hosting server instance.  Can be null when testing (won't try
+   * and update in zk if a null server)
+   * @param services Used to online/offline regions.
+   * @param a first daughter region
+   * @param a second daughter region
+   * @throws IOException If thrown, transaction failed.
+   *          Call {@link #rollback(Server, RegionServerServices)}
+   */
+  /* package */void transitionZKNode(final Server server,
+      final RegionServerServices services, HRegion a, HRegion b)
+      throws IOException {
+    // Tell master about split by updating zk.  If we fail, abort.
+    if (server != null && server.getZooKeeper() != null) {
+      try {
+        this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
+          parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
+          server.getServerName(), this.znodeVersion,
+          RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT);
+
+        int spins = 0;
+        // Now wait for the master to process the split. We know it's done
+        // when the znode is deleted. The reason we keep tickling the znode is
+        // that it's possible for the master to miss an event.
+        do {
+          if (spins % 10 == 0) {
+            LOG.debug("Still waiting on the master to process the split for " +
+                this.parent.getRegionInfo().getEncodedName());
+          }
+          Thread.sleep(100);
+          // When this returns -1 it means the znode doesn't exist
+          this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
+            parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
+            server.getServerName(), this.znodeVersion,
+            RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT);
+          spins++;
+        } while (this.znodeVersion != -1 && !server.isStopped()
+            && !services.isStopping());
+      } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        throw new IOException("Failed telling master about split", e);
+      }
+    }
+
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().postSplit(a,b);
+    }
+
+    // Leaving here, the splitdir with its dross will be in place but since the
+    // split was successful, just leave it; it'll be cleaned when parent is
+    // deleted and cleaned up.
+  }
+
+  /**
+   * Wait for the splitting node to be transitioned from pending_split
+   * to splitting by master. That's how we are sure master has processed
+   * the event and is good with us to move on. If we don't get any update,
+   * we periodically transition the node so that master gets the callback.
+   * If the node is removed or is not in pending_split state any more,
+   * we abort the split.
+   */
+  private int getZKNode(final Server server,
+      final RegionServerServices services) throws IOException {
+    // Wait for the master to process the pending_split.
+    try {
+      int spins = 0;
+      Stat stat = new Stat();
+      ZooKeeperWatcher zkw = server.getZooKeeper();
+      ServerName expectedServer = server.getServerName();
+      String node = parent.getRegionInfo().getEncodedName();
+      while (!(server.isStopped() || services.isStopping())) {
+        if (spins % 5 == 0) {
+          LOG.debug("Still waiting for master to process "
+            + "the pending_split for " + node);
+          transitionSplittingNode(zkw, parent.getRegionInfo(),
+            hri_a, hri_b, expectedServer, -1, RS_ZK_REQUEST_REGION_SPLIT,
+            RS_ZK_REQUEST_REGION_SPLIT);
+        }
+        Thread.sleep(100);
+        spins++;
+        byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
+        if (data == null) {
+          throw new IOException("Data is null, splitting node "
+            + node + " no longer exists");
+        }
+        RegionTransition rt = RegionTransition.parseFrom(data);
+        EventType et = rt.getEventType();
+        if (et == RS_ZK_REGION_SPLITTING) {
+          ServerName serverName = rt.getServerName();
+          if (!serverName.equals(expectedServer)) {
+            throw new IOException("Splitting node " + node + " is for "
+              + serverName + ", not us " + expectedServer);
+          }
+          byte [] payloadOfSplitting = rt.getPayload();
+          List<HRegionInfo> splittingRegions = HRegionInfo.parseDelimitedFrom(
+            payloadOfSplitting, 0, payloadOfSplitting.length);
+          assert splittingRegions.size() == 2;
+          HRegionInfo a = splittingRegions.get(0);
+          HRegionInfo b = splittingRegions.get(1);
+          if (!(hri_a.equals(a) && hri_b.equals(b))) {
+            throw new IOException("Splitting node " + node + " is for " + a + ", "
+              + b + ", not expected daughters: " + hri_a + ", " + hri_b);
+          }
+          // Master has processed it.
+          return stat.getVersion();
+        }
+        if (et != RS_ZK_REQUEST_REGION_SPLIT) {
+          throw new IOException("Splitting node " + node
+            + " moved out of splitting to " + et);
+        }
+      }
+      // Server is stopping/stopped
+      throw new IOException("Server is "
+        + (services.isStopping() ? "stopping" : "stopped"));
+    } catch (Exception e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw new IOException("Failed getting SPLITTING znode on "
+        + parent.getRegionNameAsString(), e);
+    }
+  }
+
+  /**
+   * Run the transaction.
+   * @param server Hosting server instance.  Can be null when testing (won't try
+   * and update in zk if a null server)
+   * @param services Used to online/offline regions.
+   * @throws IOException If thrown, transaction failed.
+   *          Call {@link #rollback(Server, RegionServerServices)}
+   * @return Regions created
+   * @throws IOException
+   * @see #rollback(Server, RegionServerServices)
+   */
+  public PairOfSameType<HRegion> execute(final Server server,
+      final RegionServerServices services)
+  throws IOException {
+    PairOfSameType<HRegion> regions = createDaughters(server, services);
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().preSplitAfterPONR();
+    }
+    return stepsAfterPONR(server, services, regions);
+  }
+
+  public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
+      final RegionServerServices services, PairOfSameType<HRegion> regions)
+      throws IOException {
+    openDaughters(server, services, regions.getFirst(), regions.getSecond());
+    transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
+    return regions;
+  }
+
+  private void offlineParentInMetaAndputMetaEntries(CatalogTracker catalogTracker,
+      HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
+      ServerName serverName, List<Mutation> metaEntries) throws IOException {
+    List<Mutation> mutations = metaEntries;
+    HRegionInfo copyOfParent = new HRegionInfo(parent);
+    copyOfParent.setOffline(true);
+    copyOfParent.setSplit(true);
+
+    //Put for parent
+    Put putParent = MetaEditor.makePutFromRegionInfo(copyOfParent);
+    MetaEditor.addDaughtersToPut(putParent, splitA, splitB);
+    mutations.add(putParent);
+    
+    //Puts for daughters
+    Put putA = MetaEditor.makePutFromRegionInfo(splitA);
+    Put putB = MetaEditor.makePutFromRegionInfo(splitB);
+
+    addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
+    addLocation(putB, serverName, 1);
+    mutations.add(putA);
+    mutations.add(putB);
+    MetaEditor.mutateMetaTable(catalogTracker, mutations);
+  }
+
+  public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
+    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+      Bytes.toBytes(sn.getHostAndPort()));
+    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+      Bytes.toBytes(sn.getStartcode()));
+    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
+        Bytes.toBytes(openSeqNum));
+    return p;
+  }
+
+  /*
+   * Open daughter region in its own thread.
+   * If we fail, abort this hosting server.
+   */
+  class DaughterOpener extends HasThread {
+    private final Server server;
+    private final HRegion r;
+    private Throwable t = null;
+
+    DaughterOpener(final Server s, final HRegion r) {
+      super((s == null? "null-services": s.getServerName()) +
+        "-daughterOpener=" + r.getRegionInfo().getEncodedName());
+      setDaemon(true);
+      this.server = s;
+      this.r = r;
+    }
+
+    /**
+     * @return Null if open succeeded else exception that causes us fail open.
+     * Call it after this thread exits else you may get wrong view on result.
+     */
+    Throwable getException() {
+      return this.t;
+    }
+
+    @Override
+    public void run() {
+      try {
+        openDaughterRegion(this.server, r);
+      } catch (Throwable t) {
+        this.t = t;
+      }
+    }
+  }
+
+  /**
+   * Open daughter regions, add them to online list and update meta.
+   * @param server
+   * @param daughter
+   * @throws IOException
+   * @throws KeeperException
+   */
+  void openDaughterRegion(final Server server, final HRegion daughter)
+  throws IOException, KeeperException {
+    HRegionInfo hri = daughter.getRegionInfo();
+    LoggingProgressable reporter = server == null ? null
+        : new LoggingProgressable(hri, server.getConfiguration().getLong(
+            "hbase.regionserver.split.daughter.open.log.interval", 10000));
+    daughter.openHRegion(reporter);
+  }
+
+  static class LoggingProgressable implements CancelableProgressable {
+    private final HRegionInfo hri;
+    private long lastLog = -1;
+    private final long interval;
+
+    LoggingProgressable(final HRegionInfo hri, final long interval) {
+      this.hri = hri;
+      this.interval = interval;
+    }
+
+    @Override
+    public boolean progress() {
+      long now = System.currentTimeMillis();
+      if (now - lastLog > this.interval) {
+        LOG.info("Opening " + this.hri.getRegionNameAsString());
+        this.lastLog = now;
+      }
+      return true;
+    }
+  }
+
+  private void splitStoreFiles(final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
+      throws IOException {
+    if (hstoreFilesToSplit == null) {
+      // Could be null because close didn't succeed -- for now consider it fatal
+      throw new IOException("Close returned empty list of StoreFiles");
+    }
+    // The following code sets up a thread pool executor with as many slots as
+    // there's files to split. It then fires up everything, waits for
+    // completion and finally checks for any exception
+    int nbFiles = hstoreFilesToSplit.size();
+    if (nbFiles == 0) {
+      // no file needs to be splitted.
+      return;
+    }
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("StoreFileSplitter-%1$d");
+    ThreadFactory factory = builder.build();
+    ThreadPoolExecutor threadPool =
+      (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
+    List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
+
+    // Split each store file.
+    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
+      for (StoreFile sf: entry.getValue()) {
+        StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
+        futures.add(threadPool.submit(sfs));
+      }
+    }
+    // Shutdown the pool
+    threadPool.shutdown();
+
+    // Wait for all the tasks to finish
+    try {
+      boolean stillRunning = !threadPool.awaitTermination(
+          this.fileSplitTimeout, TimeUnit.MILLISECONDS);
+      if (stillRunning) {
+        threadPool.shutdownNow();
+        // wait for the thread to shutdown completely.
+        while (!threadPool.isTerminated()) {
+          Thread.sleep(50);
+        }
+        throw new IOException("Took too long to split the" +
+            " files and create the references, aborting split");
+      }
+    } catch (InterruptedException e) {
+      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+    }
+
+    // Look for any exception
+    for (Future<Void> future: futures) {
+      try {
+        future.get();
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /**
+   * Utility class used to do the file splitting / reference writing
+   * in parallel instead of sequentially.
+   */
+  class StoreFileSplitter implements Callable<Void> {
+    private final byte[] family;
+    private final StoreFile sf;
+
+    /**
+     * Constructor that takes what it needs to split
+     * @param family Family that contains the store file
+     * @param sf which file
+     */
+    public StoreFileSplitter(final byte[] family, final StoreFile sf) {
+      this.sf = sf;
+      this.family = family;
+    }
+
+    public Void call() throws IOException {
+      splitStoreFile(family, sf);
+      return null;
+    }
+  }
+
+    private void splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
+        HRegionFileSystem fs = this.parent.getRegionFileSystem();
+        String familyName = Bytes.toString(family);
+        splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false, fs);
+        splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true, fs);
+    }
+
+    private Path splitStoreFile(HRegionInfo hri, String familyName, StoreFile f, byte[] splitRow,
+            boolean top, HRegionFileSystem fs) throws IOException {
+        f.closeReader(true);
+        Path splitDir =
+                new Path(new Path(new Path(fs.getRegionDir(), HRegionFileSystem.REGION_SPLITS_DIR),
+                        hri.getEncodedName()), familyName);
+        // A reference to the bottom half of the hsf store file.
+        Reference r =
+                top ? Reference.createTopReference(splitRow) : Reference
+                        .createBottomReference(splitRow);
+        // Add the referred-to regions name as a dot separated suffix.
+        // See REF_NAME_REGEX regex above. The referred-to regions name is
+        // up in the path of the passed in <code>f</code> -- parentdir is family,
+        // then the directory above is the region name.
+        String parentRegionName = this.parent.getRegionInfo().getEncodedName();
+        // Write reference with same file id only with the other region name as
+        // suffix and into the new region location (under same family).
+        Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
+        return r.write(fs.getFileSystem(), p);
+    }
+
+  /**
+   * @param server Hosting server instance (May be null when testing).
+   * @param services
+   * @throws IOException If thrown, rollback failed.  Take drastic action.
+   * @return True if we successfully rolled back, false if we got to the point
+   * of no return and so now need to abort the server to minimize damage.
+   */
+  @SuppressWarnings("deprecation")
+  public boolean rollback(final Server server, final RegionServerServices services)
+  throws IOException {
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().preRollBackSplit();
+    }
+
+    boolean result = true;
+    ListIterator<JournalEntry> iterator =
+      this.journal.listIterator(this.journal.size());
+    // Iterate in reverse.
+    while (iterator.hasPrevious()) {
+      JournalEntry je = iterator.previous();
+      switch(je) {
+
+      case SET_SPLITTING_IN_ZK:
+        if (server != null && server.getZooKeeper() != null) {
+          cleanZK(server, this.parent.getRegionInfo());
+        }
+        break;
+
+      case CREATE_SPLIT_DIR:
+        this.parent.writestate.writesEnabled = true;
+        this.parent.getRegionFileSystem().cleanupSplitsDir();
+        break;
+
+      case CLOSED_PARENT_REGION:
+        try {
+          // So, this returns a seqid but if we just closed and then reopened, we
+          // should be ok. On close, we flushed using sequenceid obtained from
+          // hosting regionserver so no need to propagate the sequenceid returned
+          // out of initialize below up into regionserver as we normally do.
+          // TODO: Verify.
+          this.parent.initialize();
+        } catch (IOException e) {
+          LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
+            this.parent.getRegionNameAsString(), e);
+          throw new RuntimeException(e);
+        }
+        break;
+
+      case STARTED_REGION_A_CREATION:
+        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
+        break;
+
+      case STARTED_REGION_B_CREATION:
+        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
+        break;
+
+      case OFFLINED_PARENT:
+        if (services != null) services.addToOnlineRegions(this.parent);
+        break;
+
+      case PONR:
+        // We got to the point-of-no-return so we need to just abort. Return
+        // immediately.  Do not clean up created daughter regions.  They need
+        // to be in place so we don't delete the parent region mistakenly.
+        // See HBASE-3872.
+        return false;
+
+      default:
+        throw new RuntimeException("Unhandled journal entry: " + je);
+      }
+    }
+    // Coprocessor callback
+    if (this.parent.getCoprocessorHost() != null) {
+      this.parent.getCoprocessorHost().postRollBackSplit();
+    }
+    return result;
+  }
+
+  HRegionInfo getFirstDaughter() {
+    return hri_a;
+  }
+
+  HRegionInfo getSecondDaughter() {
+    return hri_b;
+  }
+
+  private static void cleanZK(final Server server, final HRegionInfo hri) {
+    try {
+      // Only delete if its in expected state; could have been hijacked.
+      if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
+          RS_ZK_REQUEST_REGION_SPLIT, server.getServerName())) {
+        ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
+          RS_ZK_REGION_SPLITTING, server.getServerName());
+      }
+    } catch (KeeperException.NoNodeException e) {
+      LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
+    } catch (KeeperException e) {
+      server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
+    }
+  }
+
+  /**
+   * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region.
+   * Create it ephemeral in case regionserver dies mid-split.
+   *
+   * <p>Does not transition nodes from other states.  If a node already exists
+   * for this region, a {@link NodeExistsException} will be thrown.
+   *
+   * @param zkw zk reference
+   * @param region region to be created as offline
+   * @param serverName server event originates from
+   * @throws KeeperException
+   * @throws IOException
+   */
+  public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
+      final ServerName serverName, final HRegionInfo a,
+      final HRegionInfo b) throws KeeperException, IOException {
+    LOG.debug(zkw.prefix("Creating ephemeral node for " +
+      region.getEncodedName() + " in PENDING_SPLIT state"));
+    byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
+    RegionTransition rt = RegionTransition.createRegionTransition(
+      RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload);
+    String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
+    if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
+      throw new IOException("Failed create of ephemeral " + node);
+    }
+  }
+
+  /**
+   * Transitions an existing ephemeral node for the specified region which is
+   * currently in the begin state to be in the end state. Master cleans up the
+   * final SPLIT znode when it reads it (or if we crash, zk will clean it up).
+   *
+   * <p>Does not transition nodes from other states. If for some reason the
+   * node could not be transitioned, the method returns -1. If the transition
+   * is successful, the version of the node after transition is returned.
+   *
+   * <p>This method can fail and return false for three different reasons:
+   * <ul><li>Node for this region does not exist</li>
+   * <li>Node for this region is not in the begin state</li>
+   * <li>After verifying the begin state, update fails because of wrong version
+   * (this should never actually happen since an RS only does this transition
+   * following a transition to the begin state. If two RS are conflicting, one would
+   * fail the original transition to the begin state and not this transition)</li>
+   * </ul>
+   *
+   * <p>Does not set any watches.
+   *
+   * <p>This method should only be used by a RegionServer when splitting a region.
+   *
+   * @param zkw zk reference
+   * @param parent region to be transitioned to opened
+   * @param a Daughter a of split
+   * @param b Daughter b of split
+   * @param serverName server event originates from
+   * @param znodeVersion expected version of data before modification
+   * @param beginState the expected current state the znode should be
+   * @param endState the state to be transition to
+   * @return version of node after transition, -1 if unsuccessful transition
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws IOException
+   */
+  public static int transitionSplittingNode(ZooKeeperWatcher zkw,
+      HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
+      final int znodeVersion, final EventType beginState,
+      final EventType endState) throws KeeperException, IOException {
+    byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
+    return ZKAssign.transitionNode(zkw, parent, serverName,
+      beginState, endState, znodeVersion, payload);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
new file mode 100644
index 0000000..1afe6c7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.IndexSplitTransaction;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class LocalIndexSplitter extends BaseRegionObserver {
+
+    private static final Log LOG = LogFactory.getLog(LocalIndexSplitter.class);
+
+    private IndexSplitTransaction st = null;
+    private PairOfSameType<HRegion> daughterRegions = null;
+
+    @Override
+    public void preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> ctx,
+            byte[] splitKey, List<Mutation> metaEntries) throws IOException {
+        RegionCoprocessorEnvironment environment = ctx.getEnvironment();
+        HTableDescriptor tableDesc = ctx.getEnvironment().getRegion().getTableDesc();
+        if (SchemaUtil.isMetaTable(tableDesc.getName())
+                || SchemaUtil.isSequenceTable(tableDesc.getName())) {
+            return;
+        }
+        RegionServerServices rss = ctx.getEnvironment().getRegionServerServices();
+        if (tableDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) == null
+                || !Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(tableDesc
+                        .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
+            HRegion indexRegion = IndexUtil.getIndexRegion(environment);
+            if (indexRegion == null) return;
+            st = new IndexSplitTransaction(indexRegion, splitKey);
+            if (!st.prepare()) {
+                LOG.error("Prepare for the table " + indexRegion.getTableDesc().getNameAsString()
+                        + " failed. So returning null. ");
+                ctx.bypass();
+                return;
+            }
+            indexRegion.forceSplit(splitKey);
+            daughterRegions = st.stepsBeforePONR(rss, rss, false);
+            HRegionInfo copyOfParent = new HRegionInfo(indexRegion.getRegionInfo());
+            copyOfParent.setOffline(true);
+            copyOfParent.setSplit(true);
+            // Put for parent
+            Put putParent = MetaEditor.makePutFromRegionInfo(copyOfParent);
+            MetaEditor.addDaughtersToPut(putParent, daughterRegions.getFirst().getRegionInfo(),
+                daughterRegions.getSecond().getRegionInfo());
+            metaEntries.add(putParent);
+            // Puts for daughters
+            Put putA = MetaEditor.makePutFromRegionInfo(daughterRegions.getFirst().getRegionInfo());
+            Put putB =
+                    MetaEditor.makePutFromRegionInfo(daughterRegions.getSecond().getRegionInfo());
+            st.addLocation(putA, rss.getServerName(), 1);
+            st.addLocation(putB, rss.getServerName(), 1);
+            metaEntries.add(putA);
+            metaEntries.add(putB);
+        }
+    }
+
+    @Override
+    public void preSplitAfterPONR(ObserverContext<RegionCoprocessorEnvironment> ctx)
+            throws IOException {
+        if (st == null || daughterRegions == null) return;
+        RegionCoprocessorEnvironment environment = ctx.getEnvironment();
+        HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
+        st.stepsAfterPONR(rs, rs, daughterRegions);
+    }
+}