You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by je...@apache.org on 2014/08/08 23:08:48 UTC
git commit: PHOENIX-1147: Ensure data table is sent to client if
index table changes states
Repository: phoenix
Updated Branches:
refs/heads/3.0 fa72e44a6 -> 0272e2841
PHOENIX-1147: Ensure data table is sent to client if index table changes states
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0272e284
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0272e284
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0272e284
Branch: refs/heads/3.0
Commit: 0272e2841b08d40c3bfda6bbdf401ff0f9c40c90
Parents: fa72e44
Author: Jeffrey Zhong <je...@apache.org>
Authored: Tue Aug 5 14:55:57 2014 -0700
Committer: Jeffrey Zhong <je...@apache.org>
Committed: Fri Aug 8 14:08:00 2014 -0700
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 201 ++++++++++++++++++-
.../coprocessor/MetaDataEndpointImpl.java | 23 ++-
.../coprocessor/MetaDataRegionObserver.java | 22 +-
.../apache/phoenix/execute/MutationState.java | 4 +-
.../index/PhoenixIndexFailurePolicy.java | 8 +-
.../apache/phoenix/schema/MetaDataClient.java | 6 +-
.../org/apache/phoenix/schema/PTableImpl.java | 6 +-
.../org/apache/phoenix/schema/TableRef.java | 6 +-
8 files changed, 248 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 8fa8e20..bbf8a6b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -27,30 +27,65 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.sql.Connection;
+import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseCluster;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -65,32 +100,39 @@ import org.junit.experimental.categories.Category;
*/
@Category(NeedsOwnMiniClusterTest.class)
public class MutableIndexFailureIT extends BaseTest {
+ private static final int NUM_SLAVES = 4;
private static String url;
private static PhoenixTestDriver driver;
private static HBaseTestingUtility util;
+ private Timer scheduleTimer;
- private static final String SCHEMA_NAME = "";
+ private static final String SCHEMA_NAME = "S";
private static final String INDEX_TABLE_NAME = "I";
private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I");
- @BeforeClass
- public static void doSetup() throws Exception {
+ @Before
+ public void doSetup() throws Exception {
Configuration conf = HBaseConfiguration.create();
setUpConfigForMiniCluster(conf);
conf.setInt("hbase.client.retries.number", 2);
conf.setInt("hbase.client.pause", 5000);
+ conf.setInt("hbase.balancer.period", Integer.MAX_VALUE);
conf.setLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 0);
util = new HBaseTestingUtility(conf);
- util.startMiniCluster();
+ util.startMiniCluster(NUM_SLAVES);
String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
+ JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
}
- @AfterClass
- public static void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
+ if(scheduleTimer != null){
+ scheduleTimer.cancel();
+ scheduleTimer = null;
+ }
try {
destroyDriver(driver);
} finally {
@@ -158,6 +200,7 @@ public class MutableIndexFailureIT extends BaseTest {
assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE"));
assertFalse(rs.next());
+ // Verify UPSERT on data table still work after index is disabled
stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
stmt.setString(1, "a3");
stmt.setString(2, "x3");
@@ -165,11 +208,18 @@ public class MutableIndexFailureIT extends BaseTest {
stmt.execute();
conn.commit();
+ query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME + " where v1='x3'";
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ assertTrue(QueryUtil.getExplainPlan(rs).contains("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME));
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+
// recreate index table
admin.createTable(indexTableDesc);
do {
Thread.sleep(15 * 1000); // sleep 15 secs
- rs = conn.getMetaData().getTables(null, "", INDEX_TABLE_NAME, new String[] {PTableType.INDEX.toString()});
+ rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+ new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
break;
@@ -181,8 +231,143 @@ public class MutableIndexFailureIT extends BaseTest {
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
- // using 2 here because we onluy partially build index from where we failed and the oldest
+ // using 2 here because we only partially build index from where we failed and the oldest
// index row has been deleted when we dropped the index table during test.
assertEquals(2, rs.getInt(1));
}
+
+ @Test(timeout=300000)
+ public void testWriteFailureWithRegionServerDown() throws Exception {
+ String query;
+ ResultSet rs;
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = driver.connect(url, props);
+ conn.setAutoCommit(false);
+ conn.createStatement().execute(
+ "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+ query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
+ rs = conn.createStatement().executeQuery(query);
+ assertFalse(rs.next());
+
+ conn.createStatement().execute(
+ "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+ query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+ rs = conn.createStatement().executeQuery(query);
+ assertFalse(rs.next());
+
+ // Verify the metadata for index is correct.
+ rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+ new String[] { PTableType.INDEX.toString() });
+ assertTrue(rs.next());
+ assertEquals(INDEX_TABLE_NAME, rs.getString(3));
+ assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+ assertFalse(rs.next());
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+ stmt.setString(1, "a");
+ stmt.setString(2, "x");
+ stmt.setString(3, "1");
+ stmt.execute();
+ conn.commit();
+
+ // find a RS which doesn't has CATALOG table
+ byte[] catalogTable = Bytes.toBytes("SYSTEM.CATALOG");
+ byte[] indexTable = Bytes.toBytes(INDEX_TABLE_FULL_NAME);
+ final HBaseCluster cluster = this.util.getHBaseCluster();
+ Collection<ServerName> rss = cluster.getClusterStatus().getServers();
+ HBaseAdmin admin = this.util.getHBaseAdmin();
+ List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
+ ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getRegionName());
+ ServerName metaRS = cluster.getServerHoldingMeta();
+ ServerName rsToBeKilled = null;
+
+ // find first RS isn't holding META or CATALOG table
+ for(ServerName curRS : rss) {
+ if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
+ rsToBeKilled = curRS;
+ break;
+ }
+ }
+ assertTrue(rsToBeKilled != null);
+
+ regions = admin.getTableRegions(indexTable);
+ final HRegionInfo indexRegion = regions.get(0);
+ final ServerName dstRS = rsToBeKilled;
+ admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName()));
+
+ long started = System.currentTimeMillis();
+ while(true) {
+ ServerName sn = cluster.getServerHoldingRegion(indexRegion.getRegionName());
+ if (sn != null && sn.equals(dstRS)) {
+ break;
+ }
+ if((System.currentTimeMillis() - started) > 30000) {
+ assertTrue("Timeout waiting for " + indexRegion + " move to " + rsToBeKilled, false);
+ }
+ Thread.sleep(200);
+ }
+
+ // use timer sending updates in every 10ms
+ this.scheduleTimer = new Timer(true);
+ this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn), 0, 10);
+ // let timer sending some updates
+ Thread.sleep(100);
+
+ // kill RS hosting index table
+ this.util.getHBaseCluster().killRegionServer(rsToBeKilled);
+
+ // wait for index table completes recovery
+ this.util.waitUntilAllRegionsAssigned(indexTable);
+
+ // Verify the metadata for index is correct.
+ do {
+ Thread.sleep(15 * 1000); // sleep 15 secs
+ rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+ new String[] { PTableType.INDEX.toString() });
+ assertTrue(rs.next());
+ if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+ break;
+ }
+ } while(true);
+ this.scheduleTimer.cancel();
+
+ assertEquals(cluster.getClusterStatus().getDeadServers(), 1);
+ }
+
+ static class SendingUpdatesScheduleTask extends TimerTask {
+ private static final Log LOG = LogFactory.getLog(SendingUpdatesScheduleTask.class);
+
+ // inProgress is to prevent timer from invoking a new task while previous one is still
+ // running
+ private final static AtomicInteger inProgress = new AtomicInteger(0);
+ private final Connection conn;
+ private int inserts = 0;
+
+ public SendingUpdatesScheduleTask(Connection conn) {
+ this.conn = conn;
+ }
+
+ public void run() {
+ if(inProgress.get() > 0){
+ return;
+ }
+
+ try {
+ inProgress.incrementAndGet();
+ inserts++;
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+ stmt.setString(1, "a" + inserts);
+ stmt.setString(2, "x" + inserts);
+ stmt.setString(3, String.valueOf(inserts));
+ stmt.execute();
+ conn.commit();
+ } catch (Throwable t) {
+ LOG.warn("ScheduledBuildIndexTask failed!", t);
+ } finally {
+ inProgress.decrementAndGet();
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 09625ec..aae98b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
@@ -150,6 +151,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
private static final KeyValue MULTI_TENANT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES);
private static final KeyValue VIEW_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
private static final KeyValue VIEW_INDEX_ID_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
+ private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
TABLE_TYPE_KV,
TABLE_SEQ_NUM_KV,
@@ -164,7 +166,8 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
DISABLE_WAL_KV,
MULTI_TENANT_KV,
VIEW_TYPE_KV,
- VIEW_INDEX_ID_KV
+ VIEW_INDEX_ID_KV,
+ INDEX_DISABLE_TIMESTAMP_KV
);
static {
Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -374,6 +377,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
tableKeyValues[j++] = kv;
i++;
} else if (cmp > 0) {
+ timeStamp = Math.max(timeStamp, kv.getTimestamp());
tableKeyValues[j++] = null;
} else {
i++; // shouldn't happen - means unexpected KV in system table header row
@@ -1138,6 +1142,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
try {
Get get = new Get(key);
get.setTimeRange(PTable.INITIAL_SEQ_NUM, timeStamp);
+ get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
Result currentResult = region.get(get);
@@ -1146,6 +1151,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
}
KeyValue currentStateKV = currentResult.getColumnLatest(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
KeyValue currentDisableTimeStamp = currentResult.getColumnLatest(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
+ KeyValue dataTableKV = currentResult.getColumnLatest(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
PIndexState currentState = PIndexState.fromSerializedValue(currentStateKV.getBuffer()[currentStateKV.getValueOffset()]);
@@ -1192,10 +1198,23 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
}
if (currentState != newState) {
- region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
+ byte[] dataTableKey = null;
+ if(dataTableKV != null) {
+ dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, dataTableKV.getValue());
+ }
+ if(dataTableKey != null) {
+ // insert an empty KV to trigger time stamp update on data table row
+ Put p = new Put(dataTableKey);
+ p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
+ tableMetadata.add(p);
+ }
+ region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
// Invalidate from cache
Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
metaDataCache.invalidate(cacheKey);
+ if(dataTableKey != null) {
+ metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey));
+ }
}
// Get client timeStamp from mutations, since it may get updated by the mutateRowsWithLocks call
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 66b60ce..2820e59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -123,12 +123,14 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
*
*/
public static class BuildIndexScheduleTask extends TimerTask {
- // inProgress is to prevent timer from invoking a new task while previous one is still running
- private final static AtomicInteger inProgress = new AtomicInteger(0);
- RegionCoprocessorEnvironment env;
- public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
- this.env = env;
- }
+ // inProgress is to prevent timer from invoking a new task while previous one is still
+ // running
+ private final static AtomicInteger inProgress = new AtomicInteger(0);
+ RegionCoprocessorEnvironment env;
+
+ public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
+ this.env = env;
+ }
private String getJdbcUrl() {
String zkQuorum = this.env.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM);
@@ -205,6 +207,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
byte[][] rowKeyMetaData = new byte[3][];
SchemaUtil.getVarChars(r.getRow(), 3, rowKeyMetaData);
+ byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] indexTable = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
// validity check
@@ -216,8 +219,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
if (conn == null) {
conn = DriverManager.getConnection(getJdbcUrl()).unwrap(PhoenixConnection.class);
}
- PTable dataPTable = PhoenixRuntime.getTable(conn, Bytes.toString(dataTable));
- PTable indexPTable = PhoenixRuntime.getTable(conn, Bytes.toString(indexTable));
+
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable);
+ PTable dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+ PTable indexPTable = PhoenixRuntime.getTable(conn, indexTableFullName);
if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) {
LOG.debug("Index rebuild has been skipped because not all regions of index table="
+ indexPTable.getName() + " are online.");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 31ab2f2..911e3ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -117,6 +117,7 @@ public class MutationState implements SQLCloseable {
public long getUpdateCount() {
return sizeOffset + numRows;
}
+
/**
* Combine a newer mutation with this one, where in the event of overlaps,
* the newer one will take precedence.
@@ -284,6 +285,7 @@ public class MutationState implements SQLCloseable {
serverTimeStamp = timestamp;
if (result.wasUpdated()) {
// TODO: use bitset?
+ table = connection.getMetaDataCache().getTable(new PTableKey(tenantId, table.getName().getString()));
PColumn[] columns = new PColumn[table.getColumns().size()];
for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
@@ -293,12 +295,12 @@ public class MutationState implements SQLCloseable {
}
}
}
- table = connection.getMetaDataCache().getTable(new PTableKey(tenantId, table.getName().getString()));
for (PColumn column : columns) {
if (column != null) {
table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
}
}
+ tableRef.setTable(table);
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index cf8335e..149dad2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -72,6 +72,7 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
@Override
public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
+ String indexTableName = "";
try {
for (HTableInterfaceReference ref : refs) {
long minTimeStamp = 0;
@@ -89,7 +90,7 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
}
// Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
- String indexTableName = ref.getTableName();
+ indexTableName = ref.getTableName();
byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
HTableInterface systemTable = env.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
MetaDataProtocol mdProxy = systemTable.coprocessorProxy(MetaDataProtocol.class, indexTableKey);
@@ -103,13 +104,14 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
MetaDataMutationResult result = mdProxy.updateIndexState(tableMetadata);
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + result.getMutationCode() + ". Will use default failure policy instead.");
- super.handleFailure(attempted, cause);
- throw new DoNotRetryIOException("Attemp to writes to " + indexTableName + " failed.", cause);
+ throw new DoNotRetryIOException("Attemp to disable " + indexTableName + " failed.");
}
LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", cause);
}
} catch (Throwable t) {
+ LOG.warn("handleFailure failed", t);
super.handleFailure(attempted, cause);
+ throw new DoNotRetryIOException("Attemp to writes to " + indexTableName + " failed.", cause);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 39bd21f..340fe96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1353,12 +1353,12 @@ public class MetaDataClient {
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
default:
try {
- // TODO: should we update the parent table by removing the index?
- connection.removeTable(tenantId, tableName);
+ connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName));
} catch (TableNotFoundException ignore) { } // Ignore - just means wasn't cached
// TODO: we need to drop the index data when a view is dropped
boolean dropMetaData = connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
+
if (result.getTable() != null && tableType != PTableType.VIEW) {
connection.setAutoCommit(true);
PTable table = result.getTable();
@@ -1402,7 +1402,7 @@ public class MetaDataClient {
PName tenantId = connection.getTenantId();
switch (mutationCode) {
case TABLE_NOT_FOUND:
- connection.removeTable(tenantId, tableName);
+ connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName));
throw new TableNotFoundException(schemaName, tableName);
case UNALLOWED_TABLE_MUTATION:
String columnName = null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index e348a4c..74b6c41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -116,6 +116,8 @@ public class PTableImpl implements PTable {
private int estimatedSize;
public PTableImpl() {
+ this.indexes = Collections.emptyList();
+ this.physicalNames = Collections.emptyList();
}
public PTableImpl(PName tenantId, String schemaName, String tableName, long timestamp, List<PColumnFamily> families) { // For base table of mapped VIEW
@@ -138,7 +140,7 @@ public class PTableImpl implements PTable {
familyByString.put(family.getName().getString(), family);
}
this.families = families;
- this.physicalNames = Collections.emptyList();;
+ this.physicalNames = Collections.emptyList();
}
public PTableImpl(long timeStamp) { // For delete marker
@@ -938,4 +940,4 @@ public class PTableImpl implements PTable {
public PTableKey getKey() {
return key;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0272e284/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
index e820bb3..e58bb38 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hbase.HConstants;
public final class TableRef {
- private final PTable table;
+ private PTable table;
private final String alias;
private final long upperBoundTimeStamp;
private final long lowerBoundTimeStamp;
@@ -56,6 +56,10 @@ public final class TableRef {
public PTable getTable() {
return table;
}
+
+ public void setTable(PTable value) {
+ this.table = value;
+ }
public String getTableAlias() {
return alias;