You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2017/09/05 18:05:13 UTC

phoenix git commit: PHOENIX-3496 Figure out why LocalIndexIT#testLocalIndexRoundTrip is flapping(Rajeshbabu)

Repository: phoenix
Updated Branches:
  refs/heads/master 0a3ef6c1b -> c8cbb5e5e


PHOENIX-3496 Figure out why LocalIndexIT#testLocalIndexRoundTrip is flapping(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c8cbb5e5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c8cbb5e5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c8cbb5e5

Branch: refs/heads/master
Commit: c8cbb5e5e196299d5cc50385bd5ebb3791170d2f
Parents: 0a3ef6c
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Tue Sep 5 23:34:57 2017 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Tue Sep 5 23:34:57 2017 +0530

----------------------------------------------------------------------
 .../phoenix/end2end/FlappingLocalIndexIT.java   | 79 +++++++++++++++++++-
 .../phoenix/end2end/index/BaseLocalIndexIT.java |  6 +-
 .../phoenix/end2end/index/LocalIndexIT.java     |  3 +-
 .../UngroupedAggregateRegionObserver.java       | 42 ++++++++++-
 .../phoenix/iterate/BaseResultIterators.java    | 50 +++++++++----
 5 files changed, 159 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
index 7509997..e2f3970 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
@@ -21,22 +21,31 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.end2end.index.BaseLocalIndexIT;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -297,4 +306,72 @@ public class FlappingLocalIndexIT extends BaseLocalIndexIT {
         indexTable.close();
     }
 
-}
+    @Test
+    public void testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException() throws Exception {
+        testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(false);
+    }
+
+    @Test
+    public void testBuildingLocalCoveredIndexShouldHandleNoSuchColumnFamilyException() throws Exception {
+        testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(true);
+    }
+
+    private void testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(boolean coveredIndex) throws Exception {
+        String tableName = schemaName + "." + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String indexTableName = schemaName + "." + indexName;
+        TableName physicalTableName = SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped);
+
+        createBaseTable(tableName, null, null, coveredIndex ? "cf" : null);
+        Connection conn1 = DriverManager.getConnection(getUrl());
+        conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')");
+        conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')");
+        conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')");
+        conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')");
+        conn1.commit();
+        HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+        HTableDescriptor tableDescriptor = admin.getTableDescriptor(physicalTableName);
+        tableDescriptor.addCoprocessor(DeleyOpenRegionObserver.class.getName(), null,
+            QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY - 1, null);
+        admin.disableTable(physicalTableName);
+        admin.modifyTable(physicalTableName, tableDescriptor);
+        admin.enableTable(physicalTableName);
+        DeleyOpenRegionObserver.DELAY_OPEN = true;
+        conn1.createStatement().execute(
+            "CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(k3)"
+                    + (coveredIndex ? " include(cf.v1)" : ""));
+        DeleyOpenRegionObserver.DELAY_OPEN = false;
+        ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexTableName);
+        assertTrue(rs.next());
+        assertEquals(4, rs.getInt(1));
+    }
+
+    public static class DeleyOpenRegionObserver extends BaseRegionObserver {
+        public static volatile boolean DELAY_OPEN = false;
+        private int retryCount = 0;
+        private CountDownLatch latch = new CountDownLatch(1);
+        @Override
+        public void
+                preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
+                        throws IOException {
+            if(DELAY_OPEN) {
+                try {
+                    latch.await();
+                } catch (InterruptedException e1) {
+                    throw new DoNotRetryIOException(e1);
+                }
+            }
+            super.preClose(c, abortRequested);
+        }
+
+        @Override
+        public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
+                Scan scan, RegionScanner s) throws IOException {
+            if(DELAY_OPEN && retryCount == 1) {
+                latch.countDown();
+            }
+            retryCount++;
+            return super.preScannerOpen(e, scan, s);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
index 547878c..30baec4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
@@ -69,6 +69,10 @@ public abstract class BaseLocalIndexIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     protected void createBaseTable(String tableName, Integer saltBuckets, String splits) throws SQLException {
+        createBaseTable(tableName, saltBuckets, splits, null);
+    }
+
+    protected void createBaseTable(String tableName, Integer saltBuckets, String splits, String cf) throws SQLException {
         Connection conn = getConnection();
         if (isNamespaceMapped) {
             conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
@@ -77,7 +81,7 @@ public abstract class BaseLocalIndexIT extends BaseUniqueNamesOwnClusterIT {
                 "k1 INTEGER NOT NULL,\n" +
                 "k2 INTEGER NOT NULL,\n" +
                 "k3 INTEGER,\n" +
-                "v1 VARCHAR,\n" +
+                (cf != null ? (cf+'.') : "") + "v1 VARCHAR,\n" +
                 "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
                         + (saltBuckets != null && splits == null ? (" salt_buckets=" + saltBuckets) : ""
                         + (saltBuckets == null && splits != null ? (" split on " + splits) : ""));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 27edfb7..48221ab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -75,8 +75,7 @@ public class LocalIndexIT extends BaseLocalIndexIT {
         super(isNamespaceMapped);
     }
     
-    @Ignore
-    //FIXME: PHOENIX-3496 
+    @Test
     public void testLocalIndexRoundTrip() throws Exception {
         String tableName = schemaName + "." + generateUniqueName();
         String indexName = "IDX_" + generateUniqueName();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index afe0ccf..31b8e36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -90,6 +92,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
@@ -121,6 +124,7 @@ import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TimeKeeper;
@@ -276,10 +280,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             throws IOException {
         s = super.preScannerOpen(e, scan, s);
         if (ScanUtil.isAnalyzeTable(scan)) {
-//            if (!ScanUtil.isLocalIndex(scan)) {
-//                scan.getFamilyMap().clear();
-//            }
-//            scan.getFamilyMap().clear();
             // We are setting the start row and stop row such that it covers the entire region. As part
             // of Phonenix-1263 we are storing the guideposts against the physical table rather than
             // individual tenant specific tables.
@@ -448,6 +448,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                         HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
 
         boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
+        if(buildLocalIndex) {
+            checkForLocalIndexColumnFamilies(region, indexMaintainers);
+        }
         if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
             needToWrite = true;
             maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
@@ -791,6 +794,37 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     }
 
+    private void checkForLocalIndexColumnFamilies(Region region,
+            List<IndexMaintainer> indexMaintainers) throws IOException {
+        HTableDescriptor tableDesc = region.getTableDesc();
+        String schemaName =
+                tableDesc.getTableName().getNamespaceAsString()
+                        .equals(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR) ? SchemaUtil
+                        .getSchemaNameFromFullName(tableDesc.getTableName().getNameAsString())
+                        : tableDesc.getTableName().getNamespaceAsString();
+        String tableName = SchemaUtil.getTableNameFromFullName(tableDesc.getTableName().getNameAsString());
+        for (IndexMaintainer indexMaintainer : indexMaintainers) {
+            Set<ColumnReference> coveredColumns = indexMaintainer.getCoveredColumns();
+            if(coveredColumns.isEmpty()) {
+                byte[] localIndexCf = indexMaintainer.getEmptyKeyValueFamily().get();
+                // When covered columns empty we store index data in default column family so check for it.
+                if (tableDesc.getFamily(localIndexCf) == null) {
+                    ServerUtil.throwIOException("Column Family Not Found",
+                        new ColumnFamilyNotFoundException(schemaName, tableName, Bytes
+                                .toString(localIndexCf)));
+                }
+            }
+            for (ColumnReference reference : coveredColumns) {
+                byte[] cf = IndexUtil.getLocalIndexColumnFamily(reference.getFamily());
+                HColumnDescriptor family = region.getTableDesc().getFamily(cf);
+                if (family == null) {
+                    ServerUtil.throwIOException("Column Family Not Found",
+                        new ColumnFamilyNotFoundException(schemaName, tableName, Bytes.toString(cf)));
+                }
+            }
+        }
+    }
+
     private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize,
             byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto,
                         boolean isPKChanging)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 6ab5dc3..98f5d46 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
@@ -86,6 +87,7 @@ import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
@@ -899,24 +901,23 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                         try { // Rethrow as SQLException
                             throw ServerUtil.parseServerException(e);
                         } catch (StaleRegionBoundaryCacheException e2) {
-                            scanPairItr.remove();
                             // Catch only to try to recover from region boundary cache being out of date
                             if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
                                 services.clearTableRegionCache(physicalTableName);
                                 context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
                             }
-                            // Resubmit just this portion of work again
-                            Scan oldScan = scanPair.getFirst();
-                            byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
-                            byte[] endKey = oldScan.getStopRow();
-                            
-                            List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
-                            // Add any concatIterators that were successful so far
-                            // as we need these to be in order
-                            addIterator(iterators, concatIterators);
-                            concatIterators = Lists.newArrayList();
-                            getIterators(newNestedScans, services, isLocalIndex, allIterators, iterators, isReverse,
-                                    maxQueryEndTime, newNestedScans.size(), previousScan);
+                            concatIterators =
+                                    recreateIterators(services, isLocalIndex, allIterators,
+                                        iterators, isReverse, maxQueryEndTime, previousScan,
+                                        clearedCache, concatIterators, scanPairItr, scanPair);
+                        } catch(ColumnFamilyNotFoundException cfnfe) {
+                            if (scanPair.getFirst().getAttribute(LOCAL_INDEX_BUILD) != null) {
+                                Thread.sleep(1000);
+                                concatIterators =
+                                        recreateIterators(services, isLocalIndex, allIterators,
+                                            iterators, isReverse, maxQueryEndTime, previousScan,
+                                            clearedCache, concatIterators, scanPairItr, scanPair);
+                            }
                         }
                     }
                 }
@@ -968,6 +969,29 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         }
         return null; // Not reachable
     }
+
+    private List<PeekingResultIterator> recreateIterators(ConnectionQueryServices services,
+            boolean isLocalIndex, Queue<PeekingResultIterator> allIterators,
+            List<PeekingResultIterator> iterators, boolean isReverse, long maxQueryEndTime,
+            ScanWrapper previousScan, boolean clearedCache,
+            List<PeekingResultIterator> concatIterators,
+            Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr,
+            Pair<Scan, Future<PeekingResultIterator>> scanPair) throws SQLException {
+        scanPairItr.remove();
+        // Resubmit just this portion of work again
+        Scan oldScan = scanPair.getFirst();
+        byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
+        byte[] endKey = oldScan.getStopRow();
+
+        List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
+        // Add any concatIterators that were successful so far
+        // as we need these to be in order
+        addIterator(iterators, concatIterators);
+        concatIterators = Lists.newArrayList();
+        getIterators(newNestedScans, services, isLocalIndex, allIterators, iterators, isReverse,
+                maxQueryEndTime, newNestedScans.size(), previousScan);
+        return concatIterators;
+    }
     
 
     @Override