You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/12 01:22:40 UTC
[2/2] git commit: PHOENIX-1058 Support index region split on it's
corresponding data region split (Rajeshbabu)
PHOENIX-1058 Support index region split on it's corresponding data region split (Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4fa6146b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4fa6146b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4fa6146b
Branch: refs/heads/4.0
Commit: 4fa6146b3a2b0352f2b71e176a36d08499307438
Parents: 2ed929a
Author: Rajeshbabu Chintaguntla <ra...@huawei.com>
Authored: Mon Aug 11 23:53:34 2014 +0530
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Aug 11 16:26:01 2014 -0700
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/BaseQueryIT.java | 6 +
.../org/apache/phoenix/end2end/QueryIT.java | 24 +-
.../phoenix/end2end/index/LocalIndexIT.java | 110 +++
.../regionserver/IndexHalfStoreFileReader.java | 458 +++++++++
.../IndexHalfStoreFileReaderGenerator.java | 159 +++
.../regionserver/IndexSplitTransaction.java | 974 +++++++++++++++++++
.../hbase/regionserver/LocalIndexSplitter.java | 101 ++
.../apache/phoenix/index/IndexMaintainer.java | 32 +
...ocalIndexParallelIteratorRegionSplitter.java | 1 +
.../query/ConnectionQueryServicesImpl.java | 17 +
.../java/org/apache/phoenix/query/BaseTest.java | 9 +-
11 files changed, 1876 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
index d736612..4263dd2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
@@ -113,6 +113,12 @@ public abstract class BaseQueryIT extends BaseClientManagedTimeIT {
+ " B_STRING, " + " A_DATE)" });
testCases.add(new String[] { "CREATE INDEX " + ATABLE_INDEX_NAME + " ON aTable (a_integer) INCLUDE ("
+ " A_STRING, " + " B_STRING, " + " A_DATE)" });
+ testCases.add(new String[] { "CREATE LOCAL INDEX " + ATABLE_INDEX_NAME + " ON aTable (a_integer DESC) INCLUDE ("
+ + " A_STRING, " + " B_STRING, " + " A_DATE)" });
+ testCases.add(new String[] { "CREATE LOCAL INDEX " + ATABLE_INDEX_NAME + " ON aTable (a_integer, a_string) INCLUDE ("
+ + " B_STRING, " + " A_DATE)" });
+ testCases.add(new String[] { "CREATE LOCAL INDEX " + ATABLE_INDEX_NAME + " ON aTable (a_integer) INCLUDE ("
+ + " A_STRING, " + " B_STRING, " + " A_DATE)" });
testCases.add(new String[] { "" });
return testCases;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
index d9c3862..35140f4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
@@ -739,19 +739,17 @@ public class QueryIT extends BaseQueryIT {
byte[] tableName = Bytes.toBytes(ATABLE_NAME);
admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
- if (admin.tableExists(TableName.valueOf(MetaDataUtil.getLocalIndexTableName("atable")))) {
- HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName);
- htable.clearRegionCache();
- int nRegions = htable.getRegionLocations().size();
- admin.split(tableName, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A" + Character.valueOf((char) ('3' + nextRunCount())) + ts))); // vary split point with test run
- int retryCount = 0;
- do {
- Thread.sleep(2000);
- retryCount++;
- //htable.clearRegionCache();
- } while (retryCount < 10 && htable.getRegionLocations().size() == nRegions);
- assertNotEquals(nRegions, htable.getRegionLocations().size());
- }
+ HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName);
+ htable.clearRegionCache();
+ int nRegions = htable.getRegionLocations().size();
+ admin.split(tableName, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A" + Character.valueOf((char) ('3' + nextRunCount())) + ts))); // vary split point with test run
+ int retryCount = 0;
+ do {
+ Thread.sleep(2000);
+ retryCount++;
+ //htable.clearRegionCache();
+ } while (retryCount < 10 && htable.getRegionLocations().size() == nRegions);
+ assertNotEquals(nRegions, htable.getRegionLocations().size());
statement.setString(1, tenantId);
rs = statement.executeQuery();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 28eddc2..a291a37 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -27,8 +27,10 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -36,6 +38,8 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -48,6 +52,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -572,4 +577,109 @@ public class LocalIndexIT extends BaseIndexIT {
conn1.close();
}
}
+
+ @Test
+ public void testLocalIndexScanWithInList() throws Exception {
+ createBaseTable(DATA_TABLE_NAME, null, "('e','i','o')");
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ try{
+ conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('b',1,2,4,'z')");
+ conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('f',1,2,3,'a')");
+ conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('j',2,4,2,'a')");
+ conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('q',3,1,1,'c')");
+ conn1.commit();
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1) include (k3)");
+
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + INDEX_TABLE_NAME);
+ assertTrue(rs.next());
+
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+
+ String query = "SELECT t_id FROM " + DATA_TABLE_NAME +" where (v1,k3) IN (('z',4),('a',2))";
+ rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("j", rs.getString("t_id"));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString("t_id"));
+ assertFalse(rs.next());
+ } finally {
+ conn1.close();
+ }
+ }
+
+ @Test
+ public void testLocalIndexScanAfterRegionSplit() throws Exception {
+ createBaseTable(DATA_TABLE_NAME, null, "('e','j','o')");
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ try{
+ String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"};
+ for (int i = 0; i < 26; i++) {
+ conn1.createStatement().execute(
+ "UPSERT INTO " + DATA_TABLE_NAME + " values('"+strings[i]+"'," + i + ","
+ + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+ }
+ conn1.commit();
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)");
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + "_2 ON " + DATA_TABLE_NAME + "(k3)");
+
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_NAME);
+ assertTrue(rs.next());
+
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ HMaster master = getUtility().getHBaseCluster().getMaster();
+ for (int i = 1; i < 5; i++) {
+
+ admin.split(Bytes.toBytes(DATA_TABLE_NAME), ByteUtil.concat(Bytes.toBytes(strings[3*i])));
+ List<HRegionInfo> regionsOfUserTable =
+ master.getAssignmentManager().getRegionStates().getRegionsOfTable(TableName.valueOf(DATA_TABLE_NAME));
+
+ while (regionsOfUserTable.size() != (4+i)) {
+ Thread.sleep(100);
+ regionsOfUserTable = master.getAssignmentManager().getRegionStates().getRegionsOfTable(TableName.valueOf(DATA_TABLE_NAME));
+ }
+ assertEquals(4+i, regionsOfUserTable.size());
+ List<HRegionInfo> regionsOfIndexTable = master.getAssignmentManager().getRegionStates()
+ .getRegionsOfTable(TableName.valueOf(MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)));
+ while (regionsOfIndexTable.size() != (4+i)) {
+ Thread.sleep(100);
+ regionsOfIndexTable = master.getAssignmentManager().getRegionStates()
+ .getRegionsOfTable(TableName.valueOf(MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)));
+ }
+ assertEquals(4 + i, regionsOfIndexTable.size());
+ String query = "SELECT t_id,k1,v1 FROM " + DATA_TABLE_NAME;
+ rs = conn1.createStatement().executeQuery("EXPLAIN "+query);
+ assertEquals(
+ "CLIENT PARALLEL " + (4+i) + "-WAY RANGE SCAN OVER "
+ + MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)+" [-32768]\n"+
+ "CLIENT MERGE SORT",
+ QueryUtil.getExplainPlan(rs));
+ rs = conn1.createStatement().executeQuery(query);
+ Thread.sleep(1000);
+ for (int j = 0; j < 26; j++) {
+ assertTrue(rs.next());
+ assertEquals(strings[25-j], rs.getString("t_id"));
+ assertEquals(25-j, rs.getInt("k1"));
+ assertEquals(strings[j], rs.getString("V1"));
+ }
+
+ query = "SELECT t_id,k1,k3 FROM " + DATA_TABLE_NAME;
+ rs = conn1.createStatement().executeQuery("EXPLAIN "+query);
+ assertEquals(
+ "CLIENT PARALLEL " + (4+i) + "-WAY RANGE SCAN OVER "
+ + MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)+" [-32767]\n"+
+ "CLIENT MERGE SORT",
+ QueryUtil.getExplainPlan(rs));
+ rs = conn1.createStatement().executeQuery(query);
+ Thread.sleep(1000);
+ for (int j = 0; j < 26; j++) {
+ assertTrue(rs.next());
+ assertEquals(strings[j], rs.getString("t_id"));
+ assertEquals(j, rs.getInt("k1"));
+ assertEquals(j+2, rs.getInt("k3"));
+ }
+ }
+ } finally {
+ conn1.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
new file mode 100644
index 0000000..d8650cf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.index.IndexMaintainer;
+
+/**
+ * A facade for a {@link org.apache.hadoop.hbase.io.hfile.HFile.Reader} that serves up either the
+ * top or bottom half of a HFile where 'bottom' is the first half of the file containing the keys
+ * that sort lowest and 'top' is the second half of the file with keys that sort greater than those
+ * of the bottom half. The top includes the split files midkey, of the key that follows if it does
+ * not exist in the file.
+ *
+ * <p>
+ * This type works in tandem with the {@link Reference} type. This class is used reading while
+ * Reference is used writing.
+ *
+ * <p>
+ * This file is not splitable. Calls to {@link #midkey()} return null.
+ */
+
+public class IndexHalfStoreFileReader extends StoreFile.Reader {
+ private static final int ROW_KEY_LENGTH = 2;
+ private final boolean top;
+ // This is the key we split around. Its the first possible entry on a row:
+ // i.e. empty column and a timestamp of LATEST_TIMESTAMP.
+ private final byte[] splitkey;
+ private final byte[] splitRow;
+ private final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers;
+ private final byte[][] viewConstants;
+ private final int offset;
+ private final HRegionInfo regionInfo;
+ private final HRegionInfo parent;
+
+ /**
+ * @param p
+ * @param cacheConf
+ * @param r
+ * @throws IOException
+ */
+ public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
+ final Reference r, final Configuration conf,
+ final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
+ final byte[][] viewConstants, final HRegionInfo regionInfo,
+ final HRegionInfo parent) throws IOException {
+ super(fs, p, cacheConf, conf);
+ this.splitkey = r.getSplitKey();
+ // Is it top or bottom half?
+ this.top = Reference.isTopFileRegion(r.getFileRegion());
+ this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey));
+ this.indexMaintainers = indexMaintainers;
+ this.viewConstants = viewConstants;
+ this.regionInfo = regionInfo;
+ this.parent = parent;
+ this.offset =
+ parent.getStartKey().length != 0 ? parent.getStartKey().length
+ : parent.getEndKey().length;
+ }
+
+ /**
+ * @param p
+ * @param cacheConf
+ * @param r
+ * @throws IOException
+ */
+ public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
+ final FSDataInputStreamWrapper in, long size, final Reference r,
+ final Configuration conf,
+ final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
+ final byte[][] viewConstants, final HRegionInfo regionInfo, final HRegionInfo parent)
+ throws IOException {
+ super(fs, p, in, size, cacheConf, conf);
+ this.splitkey = r.getSplitKey();
+ // Is it top or bottom half?
+ this.top = Reference.isTopFileRegion(r.getFileRegion());
+ this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey));
+ this.indexMaintainers = indexMaintainers;
+ this.viewConstants = viewConstants;
+ this.regionInfo = regionInfo;
+ this.parent = parent;
+ this.offset =
+ parent.getStartKey().length != 0 ? parent.getStartKey().length
+ : parent.getEndKey().length;
+ }
+
+ protected boolean isTop() {
+ return this.top;
+ }
+
+ @Override
+ public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread,
+ final boolean isCompaction) {
+ final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction);
+ return new HFileScanner() {
+ final HFileScanner delegate = s;
+ public boolean atEnd = false;
+
+ public ByteBuffer getKey() {
+ if (atEnd) {
+ return null;
+ }
+ boolean changeBottomKeys =
+ regionInfo.getStartKey().length == 0 && splitRow.length != offset;
+ if (!top) {
+ // For first region we are prepending empty byte array of length region end key.
+ // So if split row length is not equal to region end key length then we need to
+ // replace empty bytes of split row length. Because after split end key is the split
+ // row.
+ if(!changeBottomKeys) return delegate.getKey();
+ }
+ // If it is top store file replace the StartKey of the Key with SplitKey
+ return getChangedKey(delegate.getKeyValue(), changeBottomKeys);
+ }
+
+ private ByteBuffer getChangedKey(KeyValue kv, boolean changeBottomKeys) {
+ // new KeyValue(row, family, qualifier, timestamp, type, value)
+ byte[] newRowkey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
+ KeyValue newKv =
+ new KeyValue(newRowkey, 0, newRowkey.length, kv.getFamilyArray(),
+ kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
+ kv.getQualifierOffset(), kv.getQualifierLength(),
+ kv.getTimestamp(), Type.codeToType(kv.getTypeByte()), null, 0, 0);
+ ByteBuffer keyBuffer = ByteBuffer.wrap(newKv.getKey());
+ return keyBuffer;
+ }
+
+ private byte[] getNewRowkeyByRegionStartKeyReplacedWithSplitKey(KeyValue kv, boolean changeBottomKeys) {
+ int lenOfRemainingKey = kv.getRowLength() - offset;
+ byte[] keyReplacedStartKey = new byte[lenOfRemainingKey + splitRow.length];
+ System.arraycopy(changeBottomKeys ? new byte[splitRow.length] : splitRow, 0,
+ keyReplacedStartKey, 0, splitRow.length);
+ System.arraycopy(kv.getRowArray(), kv.getRowOffset() + offset, keyReplacedStartKey,
+ splitRow.length, lenOfRemainingKey);
+ return keyReplacedStartKey;
+ }
+
+ public String getKeyString() {
+ if (atEnd) {
+ return null;
+ }
+ return Bytes.toStringBinary(getKey());
+ }
+
+ public ByteBuffer getValue() {
+ if (atEnd) {
+ return null;
+ }
+ return delegate.getValue();
+ }
+
+ public String getValueString() {
+ if (atEnd) {
+ return null;
+ }
+ return Bytes.toStringBinary(getValue());
+ }
+
+ public KeyValue getKeyValue() {
+ if (atEnd) {
+ return null;
+ }
+ KeyValue kv = delegate.getKeyValue();
+ boolean changeBottomKeys =
+ regionInfo.getStartKey().length == 0 && splitRow.length != offset;
+ if (!top) {
+ if(!changeBottomKeys) return kv;
+ }
+ // If it is a top store file change the StartKey with SplitKey in Key
+ // and produce the new value corresponding to the change in key
+ byte[] changedKey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
+ KeyValue changedKv =
+ new KeyValue(changedKey, 0, changedKey.length, kv.getFamilyArray(),
+ kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
+ kv.getQualifierOffset(), kv.getQualifierLength(),
+ kv.getTimestamp(), Type.codeToType(kv.getTypeByte()),
+ kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
+ kv.getTags());
+ return changedKv;
+ }
+
+ public boolean next() throws IOException {
+ if (atEnd) {
+ return false;
+ }
+ while (true) {
+ boolean b = delegate.next();
+ if (!b) {
+ atEnd = true;
+ return b;
+ }
+ // We need to check whether the current KV pointed by this reader is
+ // corresponding to
+ // this split or not.
+ // In case of top store file if the ActualRowKey >= SplitKey
+ // In case of bottom store file if the ActualRowKey < Splitkey
+ if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) {
+ return true;
+ }
+ }
+ }
+
+ public boolean seekBefore(byte[] key) throws IOException {
+ return seekBefore(key, 0, key.length);
+ }
+
+ public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
+ if (top) {
+ byte[] fk = getFirstKey();
+ // This will be null when the file is empty in which we can not seekBefore to
+ // any key
+ if (fk == null) {
+ return false;
+ }
+ if (getComparator().compare(key, offset, length, fk, 0, fk.length) <= 0) {
+ return false;
+ }
+ KeyValue replacedKey = getKeyPresentInHFiles(key);
+ return this.delegate.seekBefore(replacedKey.getBuffer(),
+ replacedKey.getKeyOffset(), replacedKey.getKeyLength());
+ } else {
+ // The equals sign isn't strictly necessary just here to be consistent with
+ // seekTo
+ if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
+ return this.delegate.seekBefore(splitkey, 0, splitkey.length);
+ }
+ }
+ return this.delegate.seekBefore(key, offset, length);
+ }
+
+ public boolean seekTo() throws IOException {
+ boolean b = delegate.seekTo();
+ if (!b) {
+ atEnd = true;
+ return b;
+ }
+ while (true) {
+ // We need to check the first occurrence of satisfying the condition
+ // In case of top store file if the ActualRowKey >= SplitKey
+ // In case of bottom store file if the ActualRowKey < Splitkey
+ if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) {
+ return true;
+ }
+ b = delegate.next();
+ if (!b) {
+ return b;
+ }
+ }
+ }
+
+ public int seekTo(byte[] key) throws IOException {
+ return seekTo(key, 0, key.length);
+ }
+
+ public int seekTo(byte[] key, int offset, int length) throws IOException {
+ if (top) {
+ if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
+ return -1;
+ }
+ KeyValue replacedKey = getKeyPresentInHFiles(key);
+
+ int seekTo =
+ delegate.seekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(),
+ replacedKey.getKeyLength());
+ return seekTo;
+ /*
+ * if (seekTo == 0 || seekTo == -1) { return seekTo; } else if (seekTo == 1) {
+ * boolean next = this.next(); }
+ */
+ } else {
+ if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
+ // we would place the scanner in the second half.
+ // it might be an error to return false here ever...
+ boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
+ if (!res) {
+ throw new IOException(
+ "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)");
+ }
+ return 1;
+ }
+ }
+ return delegate.seekTo(key, offset, length);
+ }
+
+ public int reseekTo(byte[] key) throws IOException {
+ return reseekTo(key, 0, key.length);
+ }
+
+ public int reseekTo(byte[] key, int offset, int length) throws IOException {
+ if (top) {
+ if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
+ return -1;
+ }
+ KeyValue replacedKey = getKeyPresentInHFiles(key);
+ return delegate.reseekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(),
+ replacedKey.getKeyLength());
+ } else {
+ if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
+ // we would place the scanner in the second half.
+ // it might be an error to return false here ever...
+ boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
+ if (!res) {
+ throw new IOException(
+ "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)");
+ }
+ return 1;
+ }
+ }
+ return delegate.reseekTo(key, offset, length);
+ }
+
+ public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
+ return this.delegate.getReader();
+ }
+
+ // TODO: Need to change as per IndexHalfStoreFileReader
+ public boolean isSeeked() {
+ return this.delegate.isSeeked();
+ }
+ };
+ }
+
+ private boolean isSatisfiedMidKeyCondition(KeyValue kv) {
+ if (CellUtil.isDelete(kv) && kv.getValueLength() == 0) {
+ // In case of a Delete type KV, let it be going to both the daughter regions.
+ // No problems in doing so. In the correct daughter region where it belongs to, this delete
+ // tomb will really delete a KV. In the other it will just hang around there with no actual
+ // kv coming for which this is a delete tomb. :)
+ return true;
+ }
+ ImmutableBytesWritable rowKey =
+ new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset() + offset,
+ kv.getRowLength() - offset);
+ Entry<ImmutableBytesWritable, IndexMaintainer> entry = indexMaintainers.entrySet().iterator().next();
+ IndexMaintainer indexMaintainer = entry.getValue();
+ byte[] viewIndexId = indexMaintainer.getViewIndexIdFromIndexRowKey(rowKey);
+ IndexMaintainer actualIndexMaintainer = indexMaintainers.get(new ImmutableBytesWritable(viewIndexId));
+ byte[] dataRowKey = actualIndexMaintainer.buildDataRowKey(rowKey, this.viewConstants);
+ int compareResult = Bytes.compareTo(dataRowKey, splitRow);
+ if (top) {
+ if (compareResult >= 0) {
+ return true;
+ }
+ } else {
+ if (compareResult < 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * In case of top half store, the passed key will be with the start key of the daughter region.
+ * But in the actual HFiles, the key will be with the start key of the old parent region. In
+ * order to make the real seek in the HFiles, we need to build the old key.
+ *
+ * The logic here is just replace daughter region start key with parent region start key
+ * in the key part.
+ *
+ * @param key
+ *
+ */
+ private KeyValue getKeyPresentInHFiles(byte[] key) {
+ KeyValue keyValue = new KeyValue(key);
+ int rowLength = keyValue.getRowLength();
+ int rowOffset = keyValue.getRowOffset();
+ byte[] parentStartKey =
+ parent.getStartKey().length == 0 ? new byte[parent.getEndKey().length] : parent
+ .getStartKey();
+ int daughterStartKeyLength =
+ regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length : regionInfo
+ .getStartKey().length;
+
+ // This comes incase of deletefamily
+ if (top
+ && 0 == keyValue.getValueLength()
+ && keyValue.getTimestamp() == HConstants.LATEST_TIMESTAMP
+ && Bytes.compareTo(keyValue.getRowArray(), keyValue.getRowOffset(),
+ keyValue.getRowLength(), splitRow, 0, splitRow.length) == 0
+ && keyValue.isDeleteFamily()) {
+ KeyValue createFirstDeleteFamilyOnRow =
+ KeyValue.createFirstDeleteFamilyOnRow(parentStartKey, keyValue.getFamily());
+ return createFirstDeleteFamilyOnRow;
+ }
+
+ short length = (short) (keyValue.getRowLength() - daughterStartKeyLength + parentStartKey.length);
+ byte[] replacedKey =
+ new byte[length + key.length - (rowOffset + rowLength) + ROW_KEY_LENGTH];
+ System.arraycopy(Bytes.toBytes(length), 0, replacedKey, 0, ROW_KEY_LENGTH);
+ System.arraycopy(parentStartKey, 0, replacedKey, ROW_KEY_LENGTH, parentStartKey.length);
+ System.arraycopy(keyValue.getRowArray(), keyValue.getRowOffset() + daughterStartKeyLength,
+ replacedKey, parentStartKey.length + ROW_KEY_LENGTH, keyValue.getRowLength()
+ - daughterStartKeyLength);
+ System.arraycopy(key, rowOffset + rowLength, replacedKey,
+ parentStartKey.length + keyValue.getRowLength() - daughterStartKeyLength
+ + ROW_KEY_LENGTH, key.length - (rowOffset + rowLength));
+ return KeyValue.createKeyValueFromKey(replacedKey);
+ }
+
+ @Override
+ public byte[] getLastKey() {
+ // This method won't get used for the index region. There is no need to call
+ // getClosestRowBefore on the index table. Also this is a split region. Can not be further
+ // split
+ throw new UnsupportedOperationException("Method is not implemented!");
+ }
+
+ @Override
+ public byte[] midkey() throws IOException {
+ // Returns null to indicate file is not splitable.
+ return null;
+ }
+
+ @Override
+ public byte[] getFirstKey() {
+ return super.getFirstKey();
+ }
+
+ @Override
+ public boolean passesKeyRangeFilter(Scan scan) {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
new file mode 100644
index 0000000..b04227f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
+
+ @Override
+ public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
+ Reference r, Reader reader) throws IOException {
+ TableName tableName = ctx.getEnvironment().getRegion().getTableDesc().getTableName();
+ HRegion region = ctx.getEnvironment().getRegion();
+ if (reader == null && r != null) {
+ Scan scan = MetaReader.getScanForTableName(tableName);
+ SingleColumnValueFilter scvf = null;
+ if (Reference.isTopFileRegion(r.getFileRegion())) {
+ scvf = new SingleColumnValueFilter(HConstants.CATALOG_FAMILY,
+ HConstants.SPLITB_QUALIFIER, CompareOp.EQUAL, region.getRegionInfo().toByteArray());
+ scvf.setFilterIfMissing(true);
+ } else {
+ scvf = new SingleColumnValueFilter(HConstants.CATALOG_FAMILY,
+ HConstants.SPLITA_QUALIFIER, CompareOp.EQUAL, region.getRegionInfo().toByteArray());
+ scvf.setFilterIfMissing(true);
+ }
+ if(scvf != null) scan.setFilter(scvf);
+ HRegionInfo parentRegion = null;
+ HTable metaTable = null;
+ PhoenixConnection conn = null;
+ try {
+ metaTable = new HTable(ctx.getEnvironment().getConfiguration(), TableName.META_TABLE_NAME);
+ ResultScanner scanner = metaTable.getScanner(scan);
+ Result result = scanner.next();
+ if (result == null || result.isEmpty()) return reader;
+ parentRegion = HRegionInfo.getHRegionInfo(result);
+ } finally {
+ if (metaTable != null) metaTable.close();
+ }
+ try {
+ conn = QueryUtil.getConnection(ctx.getEnvironment().getConfiguration()).unwrap(
+ PhoenixConnection.class);
+ String userTableName = MetaDataUtil.getUserTableName(tableName.getNameAsString());
+ PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
+ List<PTable> indexes = dataTable.getIndexes();
+ Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers =
+ new HashMap<ImmutableBytesWritable, IndexMaintainer>();
+ for (PTable index : indexes) {
+ if (index.getIndexType() == IndexType.LOCAL) {
+ IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable);
+ indexMaintainers.put(new ImmutableBytesWritable(MetaDataUtil
+ .getViewIndexIdDataType().toBytes(index.getViewIndexId())),
+ indexMaintainer);
+ }
+ }
+ if(indexMaintainers.isEmpty()) return reader;
+ byte[][] viewConstants = getViewConstants(dataTable);
+ return new IndexHalfStoreFileReader(fs, p, cacheConf, in, size, r, ctx
+ .getEnvironment().getConfiguration(), indexMaintainers, viewConstants,
+ region.getRegionInfo(), parentRegion);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+ return reader;
+ }
+
+ private byte[][] getViewConstants(PTable dataTable) {
+ int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0);
+ byte[][] viewConstants = null;
+ int nViewConstants = 0;
+ if (dataTable.getType() == PTableType.VIEW) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ List<PColumn> dataPkColumns = dataTable.getPKColumns();
+ for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
+ PColumn dataPKColumn = dataPkColumns.get(i);
+ if (dataPKColumn.getViewConstant() != null) {
+ nViewConstants++;
+ }
+ }
+ if (nViewConstants > 0) {
+ viewConstants = new byte[nViewConstants][];
+ int j = 0;
+ for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
+ PColumn dataPkColumn = dataPkColumns.get(i);
+ if (dataPkColumn.getViewConstant() != null) {
+ if (IndexUtil.getViewConstantValue(dataPkColumn, ptr)) {
+ viewConstants[j++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
+ }
+ return viewConstants;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
new file mode 100644
index 0000000..87e7d81
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
@@ -0,0 +1,974 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionTransition;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Executes region split as a "transaction". Call {@link #prepare()} to setup
+ * the transaction, {@link #execute(Server, RegionServerServices)} to run the
+ * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
+ *
+ * <p>Here is an example of how you would use this class:
+ * <pre>
+ * SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
+ * if (!st.prepare()) return;
+ * try {
+ * st.execute(server, services);
+ * } catch (IOException ioe) {
+ * try {
+ * st.rollback(server, services);
+ * return;
+ * } catch (RuntimeException e) {
+ * myAbortable.abort("Failed split, abort");
+ * }
+ * }
+ * </Pre>
+ * <p>This class is not thread safe. Caller needs ensure split is run by
+ * one thread only.
+ */
+@InterfaceAudience.Private
+public class IndexSplitTransaction {
+ private static final Log LOG = LogFactory.getLog(IndexSplitTransaction.class);
+
+ /*
+ * Region to split
+ */
+ private final HRegion parent;
+ private HRegionInfo hri_a;
+ private HRegionInfo hri_b;
+ private long fileSplitTimeout = 30000;
+ private int znodeVersion = -1;
+
+ /*
+ * Row to split around
+ */
+ private final byte [] splitrow;
+
+ /**
+ * Types to add to the transaction journal.
+ * Each enum is a step in the split transaction. Used to figure how much
+ * we need to rollback.
+ */
+ enum JournalEntry {
+ /**
+ * Set region as in transition, set it into SPLITTING state.
+ */
+ SET_SPLITTING_IN_ZK,
+ /**
+ * We created the temporary split data directory.
+ */
+ CREATE_SPLIT_DIR,
+ /**
+ * Closed the parent region.
+ */
+ CLOSED_PARENT_REGION,
+ /**
+ * The parent has been taken out of the server's online regions list.
+ */
+ OFFLINED_PARENT,
+ /**
+ * Started in on creation of the first daughter region.
+ */
+ STARTED_REGION_A_CREATION,
+ /**
+ * Started in on the creation of the second daughter region.
+ */
+ STARTED_REGION_B_CREATION,
+ /**
+ * Point of no return.
+ * If we got here, then transaction is not recoverable other than by
+ * crashing out the regionserver.
+ */
+ PONR
+ }
+
+ /*
+ * Journal of how far the split transaction has progressed.
+ */
+ private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
+
+ /**
+ * Constructor
+ * @param r Region to split
+ * @param splitrow Row to split around
+ */
+ public IndexSplitTransaction(final HRegion r, final byte [] splitrow) {
+ this.parent = r;
+ this.splitrow = splitrow;
+ }
+
+ /**
+ * Does checks on split inputs.
+ * @return <code>true</code> if the region is splittable else
+ * <code>false</code> if it is not (e.g. its already closed, etc.).
+ */
+ public boolean prepare() {
+ if (!this.parent.isSplittable()) return false;
+ // Split key can be null if this region is unsplittable; i.e. has refs.
+ if (this.splitrow == null) return false;
+ HRegionInfo hri = this.parent.getRegionInfo();
+ parent.prepareToSplit();
+ // Check splitrow.
+ byte [] startKey = hri.getStartKey();
+ byte [] endKey = hri.getEndKey();
+ if (Bytes.equals(startKey, splitrow) ||
+ !this.parent.getRegionInfo().containsRow(splitrow)) {
+ LOG.info("Split row is not inside region key range or is equal to " +
+ "startkey: " + Bytes.toStringBinary(this.splitrow));
+ return false;
+ }
+ long rid = getDaughterRegionIdTimestamp(hri);
+ this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
+ this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
+ return true;
+ }
+
+ /**
+ * Calculate daughter regionid to use.
+ * @param hri Parent {@link HRegionInfo}
+ * @return Daughter region id (timestamp) to use.
+ */
+ private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
+ long rid = EnvironmentEdgeManager.currentTimeMillis();
+ // Regionid is timestamp. Can't be less than that of parent else will insert
+ // at wrong location in hbase:meta (See HBASE-710).
+ if (rid < hri.getRegionId()) {
+ LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
+ " but current time here is " + rid);
+ rid = hri.getRegionId() + 1;
+ }
+ return rid;
+ }
+
+ private static IOException closedByOtherException = new IOException(
+ "Failed to close region: already closed by another thread");
+
+ /**
+ * Prepare the regions and region files.
+ * @param server Hosting server instance. Can be null when testing (won't try
+ * and update in zk if a null server)
+ * @param services Used to online/offline regions.
+ * @throws IOException If thrown, transaction failed.
+ * Call {@link #rollback(Server, RegionServerServices)}
+ * @return Regions created
+ */
+ /* package */PairOfSameType<HRegion> createDaughters(final Server server,
+ final RegionServerServices services) throws IOException {
+ LOG.info("Starting split of region " + this.parent);
+ if ((server != null && server.isStopped()) ||
+ (services != null && services.isStopping())) {
+ throw new IOException("Server is stopped or stopping");
+ }
+ assert !this.parent.lock.writeLock().isHeldByCurrentThread():
+ "Unsafe to hold write lock while performing RPCs";
+
+ // Coprocessor callback
+ if (this.parent.getCoprocessorHost() != null) {
+ this.parent.getCoprocessorHost().preSplit();
+ }
+
+ // Coprocessor callback
+ if (this.parent.getCoprocessorHost() != null) {
+ this.parent.getCoprocessorHost().preSplit(this.splitrow);
+ }
+
+ // If true, no cluster to write meta edits to or to update znodes in.
+ boolean testing = server == null? true:
+ server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
+ this.fileSplitTimeout = testing ? this.fileSplitTimeout :
+ server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
+ this.fileSplitTimeout);
+
+ PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
+
+ List<Mutation> metaEntries = new ArrayList<Mutation>();
+ if (this.parent.getCoprocessorHost() != null) {
+ if (this.parent.getCoprocessorHost().
+ preSplitBeforePONR(this.splitrow, metaEntries)) {
+ throw new IOException("Coprocessor bypassing region "
+ + this.parent.getRegionNameAsString() + " split.");
+ }
+ try {
+ for (Mutation p : metaEntries) {
+ HRegionInfo.parseRegionName(p.getRow());
+ }
+ } catch (IOException e) {
+ LOG.error("Row key of mutation from coprossor is not parsable as region name."
+ + "Mutations from coprocessor should only for hbase:meta table.");
+ throw e;
+ }
+ }
+
+ // This is the point of no return. Adding subsequent edits to .META. as we
+ // do below when we do the daughter opens adding each to .META. can fail in
+ // various interesting ways the most interesting of which is a timeout
+ // BUT the edits all go through (See HBASE-3872). IF we reach the PONR
+ // then subsequent failures need to crash out this regionserver; the
+ // server shutdown processing should be able to fix-up the incomplete split.
+ // The offlined parent will have the daughters as extra columns. If
+ // we leave the daughter regions in place and do not remove them when we
+ // crash out, then they will have their references to the parent in place
+ // still and the server shutdown fixup of .META. will point to these
+ // regions.
+ // We should add PONR JournalEntry before offlineParentInMeta,so even if
+ // OfflineParentInMeta timeout,this will cause regionserver exit,and then
+ // master ServerShutdownHandler will fix daughter & avoid data loss. (See
+ // HBase-4562).
+ this.journal.add(JournalEntry.PONR);
+
+ // Edit parent in meta. Offlines parent region and adds splita and splitb
+ // as an atomic update. See HBASE-7721. This update to META makes the region
+ // will determine whether the region is split or not in case of failures.
+ // If it is successful, master will roll-forward, if not, master will rollback
+ // and assign the parent region.
+ if (!testing) {
+ if (metaEntries == null || metaEntries.isEmpty()) {
+ MetaEditor.splitRegion(server.getCatalogTracker(),
+ parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
+ daughterRegions.getSecond().getRegionInfo(), server.getServerName());
+ } else {
+ offlineParentInMetaAndputMetaEntries(server.getCatalogTracker(),
+ parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
+ .getSecond().getRegionInfo(), server.getServerName(), metaEntries);
+ }
+ }
+ return daughterRegions;
+ }
+
+ public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
+ final RegionServerServices services, boolean testing) throws IOException {
+ // Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't
+ // have zookeeper so don't do zk stuff if server or zookeeper is null
+ if (server != null && server.getZooKeeper() != null) {
+ try {
+ createNodeSplitting(server.getZooKeeper(),
+ parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
+ } catch (KeeperException e) {
+ throw new IOException("Failed creating PENDING_SPLIT znode on " +
+ this.parent.getRegionNameAsString(), e);
+ }
+ }
+ this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
+ if (server != null && server.getZooKeeper() != null) {
+ // After creating the split node, wait for master to transition it
+ // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
+ // knows about it and won't transition any region which is splitting.
+ znodeVersion = getZKNode(server, services);
+ }
+
+ this.parent.getRegionFileSystem().createSplitsDir();
+ this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
+
+ Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
+ Exception exceptionToThrow = null;
+ try{
+ hstoreFilesToSplit = this.parent.close(false);
+ } catch (Exception e) {
+ exceptionToThrow = e;
+ }
+ if (exceptionToThrow == null && hstoreFilesToSplit == null) {
+ // The region was closed by a concurrent thread. We can't continue
+ // with the split, instead we must just abandon the split. If we
+ // reopen or split this could cause problems because the region has
+ // probably already been moved to a different server, or is in the
+ // process of moving to a different server.
+ exceptionToThrow = closedByOtherException;
+ }
+ if (exceptionToThrow != closedByOtherException) {
+ this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
+ }
+ if (exceptionToThrow != null) {
+ if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
+ throw new IOException(exceptionToThrow);
+ }
+ if (!testing) {
+ services.removeFromOnlineRegions(this.parent, null);
+ }
+ this.journal.add(JournalEntry.OFFLINED_PARENT);
+
+ // TODO: If splitStoreFiles were multithreaded would we complete steps in
+ // less elapsed time? St.Ack 20100920
+ //
+ // splitStoreFiles creates daughter region dirs under the parent splits dir
+ // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
+ // clean this up.
+ splitStoreFiles(hstoreFilesToSplit);
+
+ // Log to the journal that we are creating region A, the first daughter
+ // region. We could fail halfway through. If we do, we could have left
+ // stuff in fs that needs cleanup -- a storefile or two. Thats why we
+ // add entry to journal BEFORE rather than AFTER the change.
+ this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
+ HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
+
+ // Ditto
+ this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
+ HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
+ return new PairOfSameType<HRegion>(a, b);
+ }
+
+ /**
+ * Perform time consuming opening of the daughter regions.
+ * @param server Hosting server instance. Can be null when testing (won't try
+ * and update in zk if a null server)
+ * @param services Used to online/offline regions.
+ * @param a first daughter region
+ * @param a second daughter region
+ * @throws IOException If thrown, transaction failed.
+ * Call {@link #rollback(Server, RegionServerServices)}
+ */
+ /* package */void openDaughters(final Server server,
+ final RegionServerServices services, HRegion a, HRegion b)
+ throws IOException {
+ boolean stopped = server != null && server.isStopped();
+ boolean stopping = services != null && services.isStopping();
+ // TODO: Is this check needed here?
+ if (stopped || stopping) {
+ LOG.info("Not opening daughters " +
+ b.getRegionInfo().getRegionNameAsString() +
+ " and " +
+ a.getRegionInfo().getRegionNameAsString() +
+ " because stopping=" + stopping + ", stopped=" + stopped);
+ } else {
+ // Open daughters in parallel.
+ DaughterOpener aOpener = new DaughterOpener(server, a);
+ DaughterOpener bOpener = new DaughterOpener(server, b);
+ aOpener.start();
+ bOpener.start();
+ try {
+ aOpener.join();
+ bOpener.join();
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ }
+ if (aOpener.getException() != null) {
+ throw new IOException("Failed " +
+ aOpener.getName(), aOpener.getException());
+ }
+ if (bOpener.getException() != null) {
+ throw new IOException("Failed " +
+ bOpener.getName(), bOpener.getException());
+ }
+ if (services != null) {
+ try {
+ // add 2nd daughter first (see HBASE-4335)
+ services.postOpenDeployTasks(b, server.getCatalogTracker());
+ // Should add it to OnlineRegions
+ services.addToOnlineRegions(b);
+ services.postOpenDeployTasks(a, server.getCatalogTracker());
+ services.addToOnlineRegions(a);
+ } catch (KeeperException ke) {
+ throw new IOException(ke);
+ }
+ }
+ }
+ }
+
+ /**
+ * Finish off split transaction, transition the zknode
+ * @param server Hosting server instance. Can be null when testing (won't try
+ * and update in zk if a null server)
+ * @param services Used to online/offline regions.
+ * @param a first daughter region
+ * @param a second daughter region
+ * @throws IOException If thrown, transaction failed.
+ * Call {@link #rollback(Server, RegionServerServices)}
+ */
+ /* package */void transitionZKNode(final Server server,
+ final RegionServerServices services, HRegion a, HRegion b)
+ throws IOException {
+ // Tell master about split by updating zk. If we fail, abort.
+ if (server != null && server.getZooKeeper() != null) {
+ try {
+ this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
+ parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
+ server.getServerName(), this.znodeVersion,
+ RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT);
+
+ int spins = 0;
+ // Now wait for the master to process the split. We know it's done
+ // when the znode is deleted. The reason we keep tickling the znode is
+ // that it's possible for the master to miss an event.
+ do {
+ if (spins % 10 == 0) {
+ LOG.debug("Still waiting on the master to process the split for " +
+ this.parent.getRegionInfo().getEncodedName());
+ }
+ Thread.sleep(100);
+ // When this returns -1 it means the znode doesn't exist
+ this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
+ parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
+ server.getServerName(), this.znodeVersion,
+ RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT);
+ spins++;
+ } while (this.znodeVersion != -1 && !server.isStopped()
+ && !services.isStopping());
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new IOException("Failed telling master about split", e);
+ }
+ }
+
+ // Coprocessor callback
+ if (this.parent.getCoprocessorHost() != null) {
+ this.parent.getCoprocessorHost().postSplit(a,b);
+ }
+
+ // Leaving here, the splitdir with its dross will be in place but since the
+ // split was successful, just leave it; it'll be cleaned when parent is
+ // deleted and cleaned up.
+ }
+
+ /**
+ * Wait for the splitting node to be transitioned from pending_split
+ * to splitting by master. That's how we are sure master has processed
+ * the event and is good with us to move on. If we don't get any update,
+ * we periodically transition the node so that master gets the callback.
+ * If the node is removed or is not in pending_split state any more,
+ * we abort the split.
+ */
+ private int getZKNode(final Server server,
+ final RegionServerServices services) throws IOException {
+ // Wait for the master to process the pending_split.
+ try {
+ int spins = 0;
+ Stat stat = new Stat();
+ ZooKeeperWatcher zkw = server.getZooKeeper();
+ ServerName expectedServer = server.getServerName();
+ String node = parent.getRegionInfo().getEncodedName();
+ while (!(server.isStopped() || services.isStopping())) {
+ if (spins % 5 == 0) {
+ LOG.debug("Still waiting for master to process "
+ + "the pending_split for " + node);
+ transitionSplittingNode(zkw, parent.getRegionInfo(),
+ hri_a, hri_b, expectedServer, -1, RS_ZK_REQUEST_REGION_SPLIT,
+ RS_ZK_REQUEST_REGION_SPLIT);
+ }
+ Thread.sleep(100);
+ spins++;
+ byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
+ if (data == null) {
+ throw new IOException("Data is null, splitting node "
+ + node + " no longer exists");
+ }
+ RegionTransition rt = RegionTransition.parseFrom(data);
+ EventType et = rt.getEventType();
+ if (et == RS_ZK_REGION_SPLITTING) {
+ ServerName serverName = rt.getServerName();
+ if (!serverName.equals(expectedServer)) {
+ throw new IOException("Splitting node " + node + " is for "
+ + serverName + ", not us " + expectedServer);
+ }
+ byte [] payloadOfSplitting = rt.getPayload();
+ List<HRegionInfo> splittingRegions = HRegionInfo.parseDelimitedFrom(
+ payloadOfSplitting, 0, payloadOfSplitting.length);
+ assert splittingRegions.size() == 2;
+ HRegionInfo a = splittingRegions.get(0);
+ HRegionInfo b = splittingRegions.get(1);
+ if (!(hri_a.equals(a) && hri_b.equals(b))) {
+ throw new IOException("Splitting node " + node + " is for " + a + ", "
+ + b + ", not expected daughters: " + hri_a + ", " + hri_b);
+ }
+ // Master has processed it.
+ return stat.getVersion();
+ }
+ if (et != RS_ZK_REQUEST_REGION_SPLIT) {
+ throw new IOException("Splitting node " + node
+ + " moved out of splitting to " + et);
+ }
+ }
+ // Server is stopping/stopped
+ throw new IOException("Server is "
+ + (services.isStopping() ? "stopping" : "stopped"));
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new IOException("Failed getting SPLITTING znode on "
+ + parent.getRegionNameAsString(), e);
+ }
+ }
+
+ /**
+ * Run the transaction.
+ * @param server Hosting server instance. Can be null when testing (won't try
+ * and update in zk if a null server)
+ * @param services Used to online/offline regions.
+ * @throws IOException If thrown, transaction failed.
+ * Call {@link #rollback(Server, RegionServerServices)}
+ * @return Regions created
+ * @throws IOException
+ * @see #rollback(Server, RegionServerServices)
+ */
+ public PairOfSameType<HRegion> execute(final Server server,
+ final RegionServerServices services)
+ throws IOException {
+ PairOfSameType<HRegion> regions = createDaughters(server, services);
+ if (this.parent.getCoprocessorHost() != null) {
+ this.parent.getCoprocessorHost().preSplitAfterPONR();
+ }
+ return stepsAfterPONR(server, services, regions);
+ }
+
+ public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
+ final RegionServerServices services, PairOfSameType<HRegion> regions)
+ throws IOException {
+ openDaughters(server, services, regions.getFirst(), regions.getSecond());
+ transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
+ return regions;
+ }
+
+ private void offlineParentInMetaAndputMetaEntries(CatalogTracker catalogTracker,
+ HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
+ ServerName serverName, List<Mutation> metaEntries) throws IOException {
+ List<Mutation> mutations = metaEntries;
+ HRegionInfo copyOfParent = new HRegionInfo(parent);
+ copyOfParent.setOffline(true);
+ copyOfParent.setSplit(true);
+
+ //Put for parent
+ Put putParent = MetaEditor.makePutFromRegionInfo(copyOfParent);
+ MetaEditor.addDaughtersToPut(putParent, splitA, splitB);
+ mutations.add(putParent);
+
+ //Puts for daughters
+ Put putA = MetaEditor.makePutFromRegionInfo(splitA);
+ Put putB = MetaEditor.makePutFromRegionInfo(splitB);
+
+ addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
+ addLocation(putB, serverName, 1);
+ mutations.add(putA);
+ mutations.add(putB);
+ MetaEditor.mutateMetaTable(catalogTracker, mutations);
+ }
+
+ public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
+ p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+ Bytes.toBytes(sn.getHostAndPort()));
+ p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+ Bytes.toBytes(sn.getStartcode()));
+ p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
+ Bytes.toBytes(openSeqNum));
+ return p;
+ }
+
+ /*
+ * Open daughter region in its own thread.
+ * If we fail, abort this hosting server.
+ */
+ class DaughterOpener extends HasThread {
+ private final Server server;
+ private final HRegion r;
+ private Throwable t = null;
+
+ DaughterOpener(final Server s, final HRegion r) {
+ super((s == null? "null-services": s.getServerName()) +
+ "-daughterOpener=" + r.getRegionInfo().getEncodedName());
+ setDaemon(true);
+ this.server = s;
+ this.r = r;
+ }
+
+ /**
+ * @return Null if open succeeded else exception that causes us fail open.
+ * Call it after this thread exits else you may get wrong view on result.
+ */
+ Throwable getException() {
+ return this.t;
+ }
+
+ @Override
+ public void run() {
+ try {
+ openDaughterRegion(this.server, r);
+ } catch (Throwable t) {
+ this.t = t;
+ }
+ }
+ }
+
+ /**
+ * Open daughter regions, add them to online list and update meta.
+ * @param server
+ * @param daughter
+ * @throws IOException
+ * @throws KeeperException
+ */
+ void openDaughterRegion(final Server server, final HRegion daughter)
+ throws IOException, KeeperException {
+ HRegionInfo hri = daughter.getRegionInfo();
+ LoggingProgressable reporter = server == null ? null
+ : new LoggingProgressable(hri, server.getConfiguration().getLong(
+ "hbase.regionserver.split.daughter.open.log.interval", 10000));
+ daughter.openHRegion(reporter);
+ }
+
+ static class LoggingProgressable implements CancelableProgressable {
+ private final HRegionInfo hri;
+ private long lastLog = -1;
+ private final long interval;
+
+ LoggingProgressable(final HRegionInfo hri, final long interval) {
+ this.hri = hri;
+ this.interval = interval;
+ }
+
+ @Override
+ public boolean progress() {
+ long now = System.currentTimeMillis();
+ if (now - lastLog > this.interval) {
+ LOG.info("Opening " + this.hri.getRegionNameAsString());
+ this.lastLog = now;
+ }
+ return true;
+ }
+ }
+
+ private void splitStoreFiles(final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
+ throws IOException {
+ if (hstoreFilesToSplit == null) {
+ // Could be null because close didn't succeed -- for now consider it fatal
+ throw new IOException("Close returned empty list of StoreFiles");
+ }
+ // The following code sets up a thread pool executor with as many slots as
+ // there's files to split. It then fires up everything, waits for
+ // completion and finally checks for any exception
+ int nbFiles = hstoreFilesToSplit.size();
+ if (nbFiles == 0) {
+ // no file needs to be splitted.
+ return;
+ }
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setNameFormat("StoreFileSplitter-%1$d");
+ ThreadFactory factory = builder.build();
+ ThreadPoolExecutor threadPool =
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
+ List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
+
+ // Split each store file.
+ for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
+ for (StoreFile sf: entry.getValue()) {
+ StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
+ futures.add(threadPool.submit(sfs));
+ }
+ }
+ // Shutdown the pool
+ threadPool.shutdown();
+
+ // Wait for all the tasks to finish
+ try {
+ boolean stillRunning = !threadPool.awaitTermination(
+ this.fileSplitTimeout, TimeUnit.MILLISECONDS);
+ if (stillRunning) {
+ threadPool.shutdownNow();
+ // wait for the thread to shutdown completely.
+ while (!threadPool.isTerminated()) {
+ Thread.sleep(50);
+ }
+ throw new IOException("Took too long to split the" +
+ " files and create the references, aborting split");
+ }
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ }
+
+ // Look for any exception
+ for (Future<Void> future: futures) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /**
+ * Utility class used to do the file splitting / reference writing
+ * in parallel instead of sequentially.
+ */
+ class StoreFileSplitter implements Callable<Void> {
+ private final byte[] family;
+ private final StoreFile sf;
+
+ /**
+ * Constructor that takes what it needs to split
+ * @param family Family that contains the store file
+ * @param sf which file
+ */
+ public StoreFileSplitter(final byte[] family, final StoreFile sf) {
+ this.sf = sf;
+ this.family = family;
+ }
+
+ public Void call() throws IOException {
+ splitStoreFile(family, sf);
+ return null;
+ }
+ }
+
+ private void splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
+ HRegionFileSystem fs = this.parent.getRegionFileSystem();
+ String familyName = Bytes.toString(family);
+ splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false, fs);
+ splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true, fs);
+ }
+
+ private Path splitStoreFile(HRegionInfo hri, String familyName, StoreFile f, byte[] splitRow,
+ boolean top, HRegionFileSystem fs) throws IOException {
+ f.closeReader(true);
+ Path splitDir =
+ new Path(new Path(new Path(fs.getRegionDir(), HRegionFileSystem.REGION_SPLITS_DIR),
+ hri.getEncodedName()), familyName);
+ // A reference to the bottom half of the hsf store file.
+ Reference r =
+ top ? Reference.createTopReference(splitRow) : Reference
+ .createBottomReference(splitRow);
+ // Add the referred-to regions name as a dot separated suffix.
+ // See REF_NAME_REGEX regex above. The referred-to regions name is
+ // up in the path of the passed in <code>f</code> -- parentdir is family,
+ // then the directory above is the region name.
+ String parentRegionName = this.parent.getRegionInfo().getEncodedName();
+ // Write reference with same file id only with the other region name as
+ // suffix and into the new region location (under same family).
+ Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
+ return r.write(fs.getFileSystem(), p);
+ }
+
+ /**
+ * @param server Hosting server instance (May be null when testing).
+ * @param services
+ * @throws IOException If thrown, rollback failed. Take drastic action.
+ * @return True if we successfully rolled back, false if we got to the point
+ * of no return and so now need to abort the server to minimize damage.
+ */
+ @SuppressWarnings("deprecation")
+ public boolean rollback(final Server server, final RegionServerServices services)
+ throws IOException {
+ // Coprocessor callback
+ if (this.parent.getCoprocessorHost() != null) {
+ this.parent.getCoprocessorHost().preRollBackSplit();
+ }
+
+ boolean result = true;
+ ListIterator<JournalEntry> iterator =
+ this.journal.listIterator(this.journal.size());
+ // Iterate in reverse.
+ while (iterator.hasPrevious()) {
+ JournalEntry je = iterator.previous();
+ switch(je) {
+
+ case SET_SPLITTING_IN_ZK:
+ if (server != null && server.getZooKeeper() != null) {
+ cleanZK(server, this.parent.getRegionInfo());
+ }
+ break;
+
+ case CREATE_SPLIT_DIR:
+ this.parent.writestate.writesEnabled = true;
+ this.parent.getRegionFileSystem().cleanupSplitsDir();
+ break;
+
+ case CLOSED_PARENT_REGION:
+ try {
+ // So, this returns a seqid but if we just closed and then reopened, we
+ // should be ok. On close, we flushed using sequenceid obtained from
+ // hosting regionserver so no need to propagate the sequenceid returned
+ // out of initialize below up into regionserver as we normally do.
+ // TODO: Verify.
+ this.parent.initialize();
+ } catch (IOException e) {
+ LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
+ this.parent.getRegionNameAsString(), e);
+ throw new RuntimeException(e);
+ }
+ break;
+
+ case STARTED_REGION_A_CREATION:
+ this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
+ break;
+
+ case STARTED_REGION_B_CREATION:
+ this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
+ break;
+
+ case OFFLINED_PARENT:
+ if (services != null) services.addToOnlineRegions(this.parent);
+ break;
+
+ case PONR:
+ // We got to the point-of-no-return so we need to just abort. Return
+ // immediately. Do not clean up created daughter regions. They need
+ // to be in place so we don't delete the parent region mistakenly.
+ // See HBASE-3872.
+ return false;
+
+ default:
+ throw new RuntimeException("Unhandled journal entry: " + je);
+ }
+ }
+ // Coprocessor callback
+ if (this.parent.getCoprocessorHost() != null) {
+ this.parent.getCoprocessorHost().postRollBackSplit();
+ }
+ return result;
+ }
+
+ HRegionInfo getFirstDaughter() {
+ return hri_a;
+ }
+
+ HRegionInfo getSecondDaughter() {
+ return hri_b;
+ }
+
+ private static void cleanZK(final Server server, final HRegionInfo hri) {
+ try {
+ // Only delete if its in expected state; could have been hijacked.
+ if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
+ RS_ZK_REQUEST_REGION_SPLIT, server.getServerName())) {
+ ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
+ RS_ZK_REGION_SPLITTING, server.getServerName());
+ }
+ } catch (KeeperException.NoNodeException e) {
+ LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
+ } catch (KeeperException e) {
+ server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
+ }
+ }
+
+ /**
+ * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region.
+ * Create it ephemeral in case regionserver dies mid-split.
+ *
+ * <p>Does not transition nodes from other states. If a node already exists
+ * for this region, a {@link NodeExistsException} will be thrown.
+ *
+ * @param zkw zk reference
+ * @param region region to be created as offline
+ * @param serverName server event originates from
+ * @throws KeeperException
+ * @throws IOException
+ */
+ public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
+ final ServerName serverName, final HRegionInfo a,
+ final HRegionInfo b) throws KeeperException, IOException {
+ LOG.debug(zkw.prefix("Creating ephemeral node for " +
+ region.getEncodedName() + " in PENDING_SPLIT state"));
+ byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
+ RegionTransition rt = RegionTransition.createRegionTransition(
+ RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload);
+ String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
+ if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
+ throw new IOException("Failed create of ephemeral " + node);
+ }
+ }
+
+ /**
+ * Transitions an existing ephemeral node for the specified region which is
+ * currently in the begin state to be in the end state. Master cleans up the
+ * final SPLIT znode when it reads it (or if we crash, zk will clean it up).
+ *
+ * <p>Does not transition nodes from other states. If for some reason the
+ * node could not be transitioned, the method returns -1. If the transition
+ * is successful, the version of the node after transition is returned.
+ *
+ * <p>This method can fail and return false for three different reasons:
+ * <ul><li>Node for this region does not exist</li>
+ * <li>Node for this region is not in the begin state</li>
+ * <li>After verifying the begin state, update fails because of wrong version
+ * (this should never actually happen since an RS only does this transition
+ * following a transition to the begin state. If two RS are conflicting, one would
+ * fail the original transition to the begin state and not this transition)</li>
+ * </ul>
+ *
+ * <p>Does not set any watches.
+ *
+ * <p>This method should only be used by a RegionServer when splitting a region.
+ *
+ * @param zkw zk reference
+ * @param parent region to be transitioned to opened
+ * @param a Daughter a of split
+ * @param b Daughter b of split
+ * @param serverName server event originates from
+ * @param znodeVersion expected version of data before modification
+ * @param beginState the expected current state the znode should be
+ * @param endState the state to be transition to
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws IOException
+ */
+ public static int transitionSplittingNode(ZooKeeperWatcher zkw,
+ HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
+ final int znodeVersion, final EventType beginState,
+ final EventType endState) throws KeeperException, IOException {
+ byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
+ return ZKAssign.transitionNode(zkw, parent, serverName,
+ beginState, endState, znodeVersion, payload);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa6146b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
new file mode 100644
index 0000000..1afe6c7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.IndexSplitTransaction;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class LocalIndexSplitter extends BaseRegionObserver {
+
+ private static final Log LOG = LogFactory.getLog(LocalIndexSplitter.class);
+
+ private IndexSplitTransaction st = null;
+ private PairOfSameType<HRegion> daughterRegions = null;
+
+ @Override
+ public void preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ byte[] splitKey, List<Mutation> metaEntries) throws IOException {
+ RegionCoprocessorEnvironment environment = ctx.getEnvironment();
+ HTableDescriptor tableDesc = ctx.getEnvironment().getRegion().getTableDesc();
+ if (SchemaUtil.isMetaTable(tableDesc.getName())
+ || SchemaUtil.isSequenceTable(tableDesc.getName())) {
+ return;
+ }
+ RegionServerServices rss = ctx.getEnvironment().getRegionServerServices();
+ if (tableDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) == null
+ || !Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(tableDesc
+ .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
+ HRegion indexRegion = IndexUtil.getIndexRegion(environment);
+ if (indexRegion == null) return;
+ st = new IndexSplitTransaction(indexRegion, splitKey);
+ if (!st.prepare()) {
+ LOG.error("Prepare for the table " + indexRegion.getTableDesc().getNameAsString()
+ + " failed. So returning null. ");
+ ctx.bypass();
+ return;
+ }
+ indexRegion.forceSplit(splitKey);
+ daughterRegions = st.stepsBeforePONR(rss, rss, false);
+ HRegionInfo copyOfParent = new HRegionInfo(indexRegion.getRegionInfo());
+ copyOfParent.setOffline(true);
+ copyOfParent.setSplit(true);
+ // Put for parent
+ Put putParent = MetaEditor.makePutFromRegionInfo(copyOfParent);
+ MetaEditor.addDaughtersToPut(putParent, daughterRegions.getFirst().getRegionInfo(),
+ daughterRegions.getSecond().getRegionInfo());
+ metaEntries.add(putParent);
+ // Puts for daughters
+ Put putA = MetaEditor.makePutFromRegionInfo(daughterRegions.getFirst().getRegionInfo());
+ Put putB =
+ MetaEditor.makePutFromRegionInfo(daughterRegions.getSecond().getRegionInfo());
+ st.addLocation(putA, rss.getServerName(), 1);
+ st.addLocation(putB, rss.getServerName(), 1);
+ metaEntries.add(putA);
+ metaEntries.add(putB);
+ }
+ }
+
+ @Override
+ public void preSplitAfterPONR(ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ if (st == null || daughterRegions == null) return;
+ RegionCoprocessorEnvironment environment = ctx.getEnvironment();
+ HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
+ st.stepsAfterPONR(rs, rs, daughterRegions);
+ }
+}