You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2017/09/05 18:05:13 UTC
phoenix git commit: PHOENIX-3496 Figure out why
LocalIndexIT#testLocalIndexRoundTrip is flapping(Rajeshbabu)
Repository: phoenix
Updated Branches:
refs/heads/master 0a3ef6c1b -> c8cbb5e5e
PHOENIX-3496 Figure out why LocalIndexIT#testLocalIndexRoundTrip is flapping(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c8cbb5e5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c8cbb5e5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c8cbb5e5
Branch: refs/heads/master
Commit: c8cbb5e5e196299d5cc50385bd5ebb3791170d2f
Parents: 0a3ef6c
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Tue Sep 5 23:34:57 2017 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Tue Sep 5 23:34:57 2017 +0530
----------------------------------------------------------------------
.../phoenix/end2end/FlappingLocalIndexIT.java | 79 +++++++++++++++++++-
.../phoenix/end2end/index/BaseLocalIndexIT.java | 6 +-
.../phoenix/end2end/index/LocalIndexIT.java | 3 +-
.../UngroupedAggregateRegionObserver.java | 42 ++++++++++-
.../phoenix/iterate/BaseResultIterators.java | 50 +++++++++----
5 files changed, 159 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
index 7509997..e2f3970 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
@@ -21,22 +21,31 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.end2end.index.BaseLocalIndexIT;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
@@ -297,4 +306,72 @@ public class FlappingLocalIndexIT extends BaseLocalIndexIT {
indexTable.close();
}
-}
+ @Test
+ public void testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException() throws Exception {
+ testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(false);
+ }
+
+ @Test
+ public void testBuildingLocalCoveredIndexShouldHandleNoSuchColumnFamilyException() throws Exception {
+ testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(true);
+ }
+
+ private void testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(boolean coveredIndex) throws Exception {
+ String tableName = schemaName + "." + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+ String indexTableName = schemaName + "." + indexName;
+ TableName physicalTableName = SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped);
+
+ createBaseTable(tableName, null, null, coveredIndex ? "cf" : null);
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')");
+ conn1.commit();
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(physicalTableName);
+ tableDescriptor.addCoprocessor(DeleyOpenRegionObserver.class.getName(), null,
+ QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY - 1, null);
+ admin.disableTable(physicalTableName);
+ admin.modifyTable(physicalTableName, tableDescriptor);
+ admin.enableTable(physicalTableName);
+ DeleyOpenRegionObserver.DELAY_OPEN = true;
+ conn1.createStatement().execute(
+ "CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(k3)"
+ + (coveredIndex ? " include(cf.v1)" : ""));
+ DeleyOpenRegionObserver.DELAY_OPEN = false;
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexTableName);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ }
+
+ public static class DeleyOpenRegionObserver extends BaseRegionObserver {
+ public static volatile boolean DELAY_OPEN = false;
+ private int retryCount = 0;
+ private CountDownLatch latch = new CountDownLatch(1);
+ @Override
+ public void
+ preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
+ throws IOException {
+ if(DELAY_OPEN) {
+ try {
+ latch.await();
+ } catch (InterruptedException e1) {
+ throw new DoNotRetryIOException(e1);
+ }
+ }
+ super.preClose(c, abortRequested);
+ }
+
+ @Override
+ public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
+ Scan scan, RegionScanner s) throws IOException {
+ if(DELAY_OPEN && retryCount == 1) {
+ latch.countDown();
+ }
+ retryCount++;
+ return super.preScannerOpen(e, scan, s);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
index 547878c..30baec4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
@@ -69,6 +69,10 @@ public abstract class BaseLocalIndexIT extends BaseUniqueNamesOwnClusterIT {
}
protected void createBaseTable(String tableName, Integer saltBuckets, String splits) throws SQLException {
+ createBaseTable(tableName, saltBuckets, splits, null);
+ }
+
+ protected void createBaseTable(String tableName, Integer saltBuckets, String splits, String cf) throws SQLException {
Connection conn = getConnection();
if (isNamespaceMapped) {
conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
@@ -77,7 +81,7 @@ public abstract class BaseLocalIndexIT extends BaseUniqueNamesOwnClusterIT {
"k1 INTEGER NOT NULL,\n" +
"k2 INTEGER NOT NULL,\n" +
"k3 INTEGER,\n" +
- "v1 VARCHAR,\n" +
+ (cf != null ? (cf+'.') : "") + "v1 VARCHAR,\n" +
"CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
+ (saltBuckets != null && splits == null ? (" salt_buckets=" + saltBuckets) : ""
+ (saltBuckets == null && splits != null ? (" split on " + splits) : ""));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 27edfb7..48221ab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -75,8 +75,7 @@ public class LocalIndexIT extends BaseLocalIndexIT {
super(isNamespaceMapped);
}
- @Ignore
- //FIXME: PHOENIX-3496
+ @Test
public void testLocalIndexRoundTrip() throws Exception {
String tableName = schemaName + "." + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index afe0ccf..31b8e36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -90,6 +92,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
@@ -121,6 +124,7 @@ import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TimeKeeper;
@@ -276,10 +280,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
throws IOException {
s = super.preScannerOpen(e, scan, s);
if (ScanUtil.isAnalyzeTable(scan)) {
-// if (!ScanUtil.isLocalIndex(scan)) {
-// scan.getFamilyMap().clear();
-// }
-// scan.getFamilyMap().clear();
// We are setting the start row and stop row such that it covers the entire region. As part
// of Phonenix-1263 we are storing the guideposts against the physical table rather than
// individual tenant specific tables.
@@ -448,6 +448,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
+ if(buildLocalIndex) {
+ checkForLocalIndexColumnFamilies(region, indexMaintainers);
+ }
if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
needToWrite = true;
maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
@@ -791,6 +794,37 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
+ private void checkForLocalIndexColumnFamilies(Region region,
+ List<IndexMaintainer> indexMaintainers) throws IOException {
+ HTableDescriptor tableDesc = region.getTableDesc();
+ String schemaName =
+ tableDesc.getTableName().getNamespaceAsString()
+ .equals(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR) ? SchemaUtil
+ .getSchemaNameFromFullName(tableDesc.getTableName().getNameAsString())
+ : tableDesc.getTableName().getNamespaceAsString();
+ String tableName = SchemaUtil.getTableNameFromFullName(tableDesc.getTableName().getNameAsString());
+ for (IndexMaintainer indexMaintainer : indexMaintainers) {
+ Set<ColumnReference> coveredColumns = indexMaintainer.getCoveredColumns();
+ if(coveredColumns.isEmpty()) {
+ byte[] localIndexCf = indexMaintainer.getEmptyKeyValueFamily().get();
+ // When covered columns empty we store index data in default column family so check for it.
+ if (tableDesc.getFamily(localIndexCf) == null) {
+ ServerUtil.throwIOException("Column Family Not Found",
+ new ColumnFamilyNotFoundException(schemaName, tableName, Bytes
+ .toString(localIndexCf)));
+ }
+ }
+ for (ColumnReference reference : coveredColumns) {
+ byte[] cf = IndexUtil.getLocalIndexColumnFamily(reference.getFamily());
+ HColumnDescriptor family = region.getTableDesc().getFamily(cf);
+ if (family == null) {
+ ServerUtil.throwIOException("Column Family Not Found",
+ new ColumnFamilyNotFoundException(schemaName, tableName, Bytes.toString(cf)));
+ }
+ }
+ }
+ }
+
private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize,
byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto,
boolean isPKChanging)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 6ab5dc3..98f5d46 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.iterate;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
@@ -86,6 +87,7 @@ import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
@@ -899,24 +901,23 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
try { // Rethrow as SQLException
throw ServerUtil.parseServerException(e);
} catch (StaleRegionBoundaryCacheException e2) {
- scanPairItr.remove();
// Catch only to try to recover from region boundary cache being out of date
if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
services.clearTableRegionCache(physicalTableName);
context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
}
- // Resubmit just this portion of work again
- Scan oldScan = scanPair.getFirst();
- byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
- byte[] endKey = oldScan.getStopRow();
-
- List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
- // Add any concatIterators that were successful so far
- // as we need these to be in order
- addIterator(iterators, concatIterators);
- concatIterators = Lists.newArrayList();
- getIterators(newNestedScans, services, isLocalIndex, allIterators, iterators, isReverse,
- maxQueryEndTime, newNestedScans.size(), previousScan);
+ concatIterators =
+ recreateIterators(services, isLocalIndex, allIterators,
+ iterators, isReverse, maxQueryEndTime, previousScan,
+ clearedCache, concatIterators, scanPairItr, scanPair);
+ } catch(ColumnFamilyNotFoundException cfnfe) {
+ if (scanPair.getFirst().getAttribute(LOCAL_INDEX_BUILD) != null) {
+ Thread.sleep(1000);
+ concatIterators =
+ recreateIterators(services, isLocalIndex, allIterators,
+ iterators, isReverse, maxQueryEndTime, previousScan,
+ clearedCache, concatIterators, scanPairItr, scanPair);
+ }
}
}
}
@@ -968,6 +969,29 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
return null; // Not reachable
}
+
+ private List<PeekingResultIterator> recreateIterators(ConnectionQueryServices services,
+ boolean isLocalIndex, Queue<PeekingResultIterator> allIterators,
+ List<PeekingResultIterator> iterators, boolean isReverse, long maxQueryEndTime,
+ ScanWrapper previousScan, boolean clearedCache,
+ List<PeekingResultIterator> concatIterators,
+ Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr,
+ Pair<Scan, Future<PeekingResultIterator>> scanPair) throws SQLException {
+ scanPairItr.remove();
+ // Resubmit just this portion of work again
+ Scan oldScan = scanPair.getFirst();
+ byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
+ byte[] endKey = oldScan.getStopRow();
+
+ List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
+ // Add any concatIterators that were successful so far
+ // as we need these to be in order
+ addIterator(iterators, concatIterators);
+ concatIterators = Lists.newArrayList();
+ getIterators(newNestedScans, services, isLocalIndex, allIterators, iterators, isReverse,
+ maxQueryEndTime, newNestedScans.size(), previousScan);
+ return concatIterators;
+ }
@Override