You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2020/10/08 04:14:53 UTC

[GitHub] [phoenix] kadirozde opened a new pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

kadirozde opened a new pull request #915:
URL: https://github.com/apache/phoenix/pull/915


   …al index row


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] kadirozde merged pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
kadirozde merged pull request #915:
URL: https://github.com/apache/phoenix/pull/915


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] stoty commented on pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #915:
URL: https://github.com/apache/phoenix/pull/915#issuecomment-705327370


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   6m 24s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 2 new or modified test files.  |
   ||| _ 4.x Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |   8m 39s |  4.x passed  |
   | +1 :green_heart: |  compile  |   0m 53s |  4.x passed  |
   | +1 :green_heart: |  checkstyle  |   1m 13s |  4.x passed  |
   | +1 :green_heart: |  javadoc  |   0m 45s |  4.x passed  |
   | +0 :ok: |  spotbugs  |   2m 54s |  phoenix-core in 4.x has 957 extant spotbugs warnings.  |
   ||| _ Patch Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |   4m 54s |  the patch passed  |
   | +1 :green_heart: |  compile  |   0m 56s |  the patch passed  |
   | +1 :green_heart: |  javac  |   0m 56s |  the patch passed  |
   | -1 :x: |  checkstyle  |   1m 21s |  phoenix-core: The patch generated 542 new + 1433 unchanged - 372 fixed = 1975 total (was 1805)  |
   | +1 :green_heart: |  whitespace  |   0m  0s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  javadoc  |   0m 41s |  the patch passed  |
   | -1 :x: |  spotbugs  |   3m  3s |  phoenix-core generated 1 new + 956 unchanged - 1 fixed = 957 total (was 957)  |
   ||| _ Other Tests _ |
   | -1 :x: |  unit  |   1m 12s |  phoenix-core in the patch failed.  |
   | +1 :green_heart: |  asflicense  |   0m 10s |  The patch does not generate ASF License warnings.  |
   |  |   |  34m 35s |   |
   
   
   | Reason | Tests |
   |-------:|:------|
   | FindBugs | module:phoenix-core |
   |  |  org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.shouldVerify(IndexTool$IndexVerifyType, byte[], Scan, Region, IndexMaintainer, IndexVerificationResultRepository, boolean) may expose internal representation by storing an externally mutable object into GlobalIndexRegionScanner.indexRowKeyforReadRepair  At GlobalIndexRegionScanner.java:IndexVerificationResultRepository, boolean) may expose internal representation by storing an externally mutable object into GlobalIndexRegionScanner.indexRowKeyforReadRepair  At GlobalIndexRegionScanner.java:[line 328] |
   | Failed junit tests | phoenix.index.ShouldVerifyTest |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.40 ServerAPI=1.40 base: https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/artifact/yetus-general-check/output/Dockerfile |
   | GITHUB PR | https://github.com/apache/phoenix/pull/915 |
   | Optional Tests | dupname asflicense javac javadoc unit spotbugs hbaseanti checkstyle compile |
   | uname | Linux b294a7ca6906 4.15.0-112-generic #113-Ubuntu SMP Thu Jul 9 23:41:39 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev/phoenix-personality.sh |
   | git revision | 4.x / 3008ca9 |
   | Default Java | Private Build-1.8.0_242-8u242-b08-0ubuntu3~16.04-b08 |
   | checkstyle | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/artifact/yetus-general-check/output/diff-checkstyle-phoenix-core.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/artifact/yetus-general-check/output/new-spotbugs-phoenix-core.html |
   | unit | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/artifact/yetus-general-check/output/patch-unit-phoenix-core.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/testReport/ |
   | Max. process+thread count | 487 (vs. ulimit of 30000) |
   | modules | C: phoenix-core U: phoenix-core |
   | Console output | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/console |
   | versions | git=2.7.4 maven=3.3.9 spotbugs=4.1.3 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] kadirozde commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
kadirozde commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r506685882



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScanner.class);
+
+    public IndexRepairRegionScanner(final RegionScanner innerScanner,
+                                     final Region region,
+                                     final Scan scan,
+                                     final RegionCoprocessorEnvironment env,
+                                     final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException {
+        super(innerScanner, region, scan, env, ungroupedAggregateRegionObserver);
+
+        byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+        dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
+        indexTableTTL = region.getTableDesc().getColumnFamilies()[0].getTimeToLive();
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                     HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration())) {
+            regionEndKeys = connection.getRegionLocator(dataHTable.getName()).getEndKeys();
+        }
+    }
+
+    public void prepareExpectedIndexMutations(Result dataRow, Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
+        Put put = null;
+        Delete del = null;
+        for (Cell cell : dataRow.rawCells()) {
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null) {
+                    put = new Put(CellUtil.cloneRow(cell));
+                }
+                put.add(cell);
+            } else {
+                if (del == null) {
+                    del = new Delete(CellUtil.cloneRow(cell));
+                }
+                del.addDeleteMarker(cell);
+            }
+        }
+        List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        Collections.reverse(indexMutations);
+        for (Mutation mutation : indexMutations) {
+            byte[] indexRowKey = mutation.getRow();
+            List<Mutation> mutationList = expectedIndexMutationMap.get(indexRowKey);
+            if (mutationList == null) {
+                mutationList = new ArrayList<>();
+                mutationList.add(mutation);
+                expectedIndexMutationMap.put(indexRowKey, mutationList);
+            } else {
+                mutationList.add(mutation);
+            }
+        }
+    }
+
+    private void repairIndexRows(Map<byte[], List<Mutation>> indexMutationMap,
+                                 List<Mutation> indexRowsToBeDeleted,
+                                 IndexToolVerificationResult verificationResult) throws IOException {
+        try {
+            int batchSize = 0;
+            List<Mutation> indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (List<Mutation> mutationList : indexMutationMap.values()) {
+                indexUpdates.addAll(mutationList);
+                batchSize += mutationList.size();
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);
+            }
+            batchSize = 0;
+            indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (Mutation mutation : indexRowsToBeDeleted) {
+                indexUpdates.add(mutation);
+                batchSize ++;
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] tkhurana commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
tkhurana commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r503669146



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScanner.class);
+
+    public IndexRepairRegionScanner(final RegionScanner innerScanner,
+                                     final Region region,
+                                     final Scan scan,
+                                     final RegionCoprocessorEnvironment env,
+                                     final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException {
+        super(innerScanner, region, scan, env, ungroupedAggregateRegionObserver);
+
+        byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+        dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
+        indexTableTTL = region.getTableDesc().getColumnFamilies()[0].getTimeToLive();
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                     HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration())) {
+            regionEndKeys = connection.getRegionLocator(dataHTable.getName()).getEndKeys();
+        }
+    }
+
+    public void prepareExpectedIndexMutations(Result dataRow, Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
+        Put put = null;
+        Delete del = null;
+        for (Cell cell : dataRow.rawCells()) {
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null) {
+                    put = new Put(CellUtil.cloneRow(cell));
+                }
+                put.add(cell);
+            } else {
+                if (del == null) {
+                    del = new Delete(CellUtil.cloneRow(cell));
+                }
+                del.addDeleteMarker(cell);
+            }
+        }
+        List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        Collections.reverse(indexMutations);
+        for (Mutation mutation : indexMutations) {
+            byte[] indexRowKey = mutation.getRow();
+            List<Mutation> mutationList = expectedIndexMutationMap.get(indexRowKey);
+            if (mutationList == null) {
+                mutationList = new ArrayList<>();
+                mutationList.add(mutation);
+                expectedIndexMutationMap.put(indexRowKey, mutationList);
+            } else {
+                mutationList.add(mutation);
+            }
+        }
+    }
+
+    private void repairIndexRows(Map<byte[], List<Mutation>> indexMutationMap,
+                                 List<Mutation> indexRowsToBeDeleted,
+                                 IndexToolVerificationResult verificationResult) throws IOException {
+        try {
+            int batchSize = 0;
+            List<Mutation> indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (List<Mutation> mutationList : indexMutationMap.values()) {
+                indexUpdates.addAll(mutationList);
+                batchSize += mutationList.size();
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);
+            }
+            batchSize = 0;
+            indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (Mutation mutation : indexRowsToBeDeleted) {
+                indexUpdates.add(mutation);
+                batchSize ++;
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);

Review comment:
       Question: Here we are calling `region.batchMutate` to update the index table but in IndexRebuildRegionScanner we call `indexHTable.batch()`. Why the difference ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] kadirozde commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
kadirozde commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r504916360



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
##########
@@ -240,4 +320,1033 @@ protected boolean isColumnIncluded(Cell cell) {
         byte[] qualifier = CellUtil.cloneQualifier(cell);
         return set.contains(qualifier);
     }
+    @VisibleForTesting
+    public boolean shouldVerify(IndexTool.IndexVerifyType verifyType,
+                                byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer,
+                                IndexVerificationResultRepository verificationResultRepository, boolean shouldVerifyCheckDone) throws IOException {
+        this.verifyType = verifyType;
+        this.indexRowKeyforReadRepair = indexRowKey;
+        this.scan = scan;
+        this.region = region;
+        this.indexMaintainer = indexMaintainer;
+        this.verificationResultRepository = verificationResultRepository;
+        this.shouldVerifyCheckDone = shouldVerifyCheckDone;
+        return shouldVerify();
+    }
+
+    protected boolean shouldVerify() throws IOException {
+        // In case of read repair, proceed with rebuild
+        // All other types of rebuilds/verification should be incrementally performed if appropriate param is passed
+        byte[] lastVerifyTimeValue = scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY);
+        Long lastVerifyTime = lastVerifyTimeValue == null ? 0 : Bytes.toLong(lastVerifyTimeValue);
+        if(indexRowKeyforReadRepair != null || lastVerifyTime == 0 || shouldVerifyCheckDone) {

Review comment:
       lastVerifyTime is not a global variable. We can improve this method by checking shouldVerifyCheckDone at the entry of this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] kadirozde commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
kadirozde commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r506685653



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
gokceni commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r504315024



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -1103,13 +1104,21 @@ private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Reg
             if (oldCoproc) {
                 return new IndexerRegionScanner(scanner, region, scan, env, this);
             } else {
-                return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
+                if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) {

Review comment:
       Is this an optimization for checking if this table is Index table @kadirozde rather than querying the PTable and looking at its type? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] kadirozde commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
kadirozde commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r504922136



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
##########
@@ -240,4 +320,1033 @@ protected boolean isColumnIncluded(Cell cell) {
         byte[] qualifier = CellUtil.cloneQualifier(cell);
         return set.contains(qualifier);
     }
+    @VisibleForTesting
+    public boolean shouldVerify(IndexTool.IndexVerifyType verifyType,
+                                byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer,
+                                IndexVerificationResultRepository verificationResultRepository, boolean shouldVerifyCheckDone) throws IOException {
+        this.verifyType = verifyType;
+        this.indexRowKeyforReadRepair = indexRowKey;
+        this.scan = scan;
+        this.region = region;
+        this.indexMaintainer = indexMaintainer;
+        this.verificationResultRepository = verificationResultRepository;
+        this.shouldVerifyCheckDone = shouldVerifyCheckDone;
+        return shouldVerify();
+    }
+
+    protected boolean shouldVerify() throws IOException {
+        // In case of read repair, proceed with rebuild
+        // All other types of rebuilds/verification should be incrementally performed if appropriate param is passed
+        byte[] lastVerifyTimeValue = scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY);
+        Long lastVerifyTime = lastVerifyTimeValue == null ? 0 : Bytes.toLong(lastVerifyTimeValue);
+        if(indexRowKeyforReadRepair != null || lastVerifyTime == 0 || shouldVerifyCheckDone) {
+            return true;
+        }
+
+        IndexToolVerificationResult verificationResultTemp = verificationResultRepository
+                .getVerificationResult(lastVerifyTime, scan, region, indexMaintainer.getIndexTableName()) ;
+        if(verificationResultTemp != null) {
+            verificationResult = verificationResultTemp;
+        }
+        shouldVerifyCheckDone = true;
+        return verificationResultTemp == null;
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    private void closeTables() throws IOException {
+        hTableFactory.shutdown();
+        if (indexHTable != null) {
+            indexHTable.close();
+        }
+        if (dataHTable != null) {
+            dataHTable.close();
+        }
+    }
+    @Override
+    public void close() throws IOException {
+        innerScanner.close();
+        if (indexRowKeyforReadRepair != null) {
+            closeTables();
+            return;
+        }
+        if (verify) {
+            try {
+                if (verificationResultRepository != null) {
+                    verificationResultRepository.logToIndexToolResultTable(verificationResult,
+                            verifyType, region.getRegionInfo().getRegionName(), skipped);
+                }
+            } finally {
+                this.pool.stop("IndexRegionObserverRegionScanner is closing");
+                closeTables();
+                if (verificationResultRepository != null) {
+                    verificationResultRepository.close();
+                }
+                if (verificationOutputRepository != null) {
+                    verificationOutputRepository.close();
+                }
+            }
+        }
+        else {
+            this.pool.stop("IndexRegionObserverRegionScanner is closing");
+            closeTables();
+        }
+    }
+
+    @VisibleForTesting
+    public int setIndexTableTTL(int ttl) {
+        indexTableTTL = ttl;
+        return 0;
+    }
+
+    @VisibleForTesting
+    public int setIndexMaintainer(IndexMaintainer indexMaintainer) {
+        this.indexMaintainer = indexMaintainer;
+        return 0;
+    }
+
+    @VisibleForTesting
+    public long setMaxLookBackInMills(long maxLookBackInMills) {
+        this.maxLookBackInMills = maxLookBackInMills;
+        return 0;
+    }
+
+    public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+                                          String errorMsg, boolean isBeforeRebuild,
+                                          IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException {
+        logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null,
+                null, isBeforeRebuild, errorType);
+    }
+
+    @VisibleForTesting
+    public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+                                          String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild,
+                                          IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException {
+        ungroupedAggregateRegionObserver.checkForRegionClosing();
+        verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
+                errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(),
+                region.getRegionInfo().getTable().getName(), isBeforeRebuild, errorType);
+    }
+
+    private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
+        List<Cell> cellList = m.getFamilyCellMap().get(family);
+        if (cellList == null) {
+            return null;
+        }
+        for (Cell cell : cellList) {
+            if (CellUtil.matchingQualifier(cell, qualifier)) {
+                return cell;
+            }
+        }
+        return null;
+    }
+
+    private void logMismatch(Mutation expected, Mutation actual, int iteration, IndexToolVerificationResult.PhaseResult verificationPhaseResult, boolean isBeforeRebuild) throws IOException {
+        if (getTimestamp(expected) != getTimestamp(actual)) {
+            String errorMsg = "Not matching timestamp";
+            byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+            logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
+                    errorMsg, null, null, isBeforeRebuild, INVALID_ROW);
+            return;
+        }
+        int expectedCellCount = 0;
+        for (List<Cell> cells : expected.getFamilyCellMap().values()) {
+            if (cells == null) {
+                continue;
+            }
+            for (Cell expectedCell : cells) {
+                expectedCellCount++;
+                byte[] family = CellUtil.cloneFamily(expectedCell);
+                byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
+                Cell actualCell = getCell(actual, family, qualifier);
+                if (actualCell == null ||
+                        !CellUtil.matchingType(expectedCell, actualCell)) {
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+                    String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+                    logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected),
+                            getTimestamp(actual), errorMsg, isBeforeRebuild, INVALID_ROW);
+                    verificationPhaseResult.setIndexHasMissingCellsCount(verificationPhaseResult.getIndexHasMissingCellsCount() + 1);
+                    return;
+                }
+                if (!CellUtil.matchingValue(actualCell, expectedCell)) {
+                    String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);

Review comment:
       Did you mean 5923? Single cell format for indexes will be optional. Also, please note that there will be a separate cell for each family.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] kadirozde commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
kadirozde commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r504909513



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScanner.class);
+
+    public IndexRepairRegionScanner(final RegionScanner innerScanner,
+                                     final Region region,
+                                     final Scan scan,
+                                     final RegionCoprocessorEnvironment env,
+                                     final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException {
+        super(innerScanner, region, scan, env, ungroupedAggregateRegionObserver);
+
+        byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+        dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
+        indexTableTTL = region.getTableDesc().getColumnFamilies()[0].getTimeToLive();
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                     HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration())) {
+            regionEndKeys = connection.getRegionLocator(dataHTable.getName()).getEndKeys();
+        }
+    }
+
+    public void prepareExpectedIndexMutations(Result dataRow, Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
+        Put put = null;
+        Delete del = null;
+        for (Cell cell : dataRow.rawCells()) {
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null) {
+                    put = new Put(CellUtil.cloneRow(cell));
+                }
+                put.add(cell);
+            } else {
+                if (del == null) {
+                    del = new Delete(CellUtil.cloneRow(cell));
+                }
+                del.addDeleteMarker(cell);
+            }
+        }
+        List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        Collections.reverse(indexMutations);
+        for (Mutation mutation : indexMutations) {
+            byte[] indexRowKey = mutation.getRow();
+            List<Mutation> mutationList = expectedIndexMutationMap.get(indexRowKey);
+            if (mutationList == null) {
+                mutationList = new ArrayList<>();
+                mutationList.add(mutation);
+                expectedIndexMutationMap.put(indexRowKey, mutationList);
+            } else {
+                mutationList.add(mutation);
+            }
+        }
+    }
+
+    private void repairIndexRows(Map<byte[], List<Mutation>> indexMutationMap,
+                                 List<Mutation> indexRowsToBeDeleted,
+                                 IndexToolVerificationResult verificationResult) throws IOException {
+        try {
+            int batchSize = 0;
+            List<Mutation> indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (List<Mutation> mutationList : indexMutationMap.values()) {
+                indexUpdates.addAll(mutationList);
+                batchSize += mutationList.size();
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);
+            }
+            batchSize = 0;
+            indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (Mutation mutation : indexRowsToBeDeleted) {
+                indexUpdates.add(mutation);
+                batchSize ++;
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);

Review comment:
       I will reduce the code duplication here. When the index table is local (in the case of repair) we use the region API and when the index table is remote (in the case of rebuild), we use the table API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] stoty commented on pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #915:
URL: https://github.com/apache/phoenix/pull/915#issuecomment-709585867


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   1m  8s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 2 new or modified test files.  |
   ||| _ 4.x Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |  32m 47s |  4.x passed  |
   | +1 :green_heart: |  compile  |   0m 58s |  4.x passed  |
   | +1 :green_heart: |  checkstyle  |   1m  0s |  4.x passed  |
   | +1 :green_heart: |  javadoc  |   0m 49s |  4.x passed  |
   | +0 :ok: |  spotbugs  |   3m  8s |  phoenix-core in 4.x has 956 extant spotbugs warnings.  |
   ||| _ Patch Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |  29m  2s |  the patch passed  |
   | +1 :green_heart: |  compile  |   0m 58s |  the patch passed  |
   | +1 :green_heart: |  javac  |   0m 58s |  the patch passed  |
   | -1 :x: |  checkstyle  |   1m  3s |  phoenix-core: The patch generated 575 new + 1421 unchanged - 385 fixed = 1996 total (was 1806)  |
   | +1 :green_heart: |  whitespace  |   0m  0s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  javadoc  |   0m 48s |  the patch passed  |
   | -1 :x: |  spotbugs  |   3m 21s |  phoenix-core generated 1 new + 955 unchanged - 1 fixed = 956 total (was 956)  |
   ||| _ Other Tests _ |
   | -1 :x: |  unit  |   1m 26s |  phoenix-core in the patch failed.  |
   | +1 :green_heart: |  asflicense  |   0m  9s |  The patch does not generate ASF License warnings.  |
   |  |   |  77m 59s |   |
   
   
   | Reason | Tests |
   |-------:|:------|
   | FindBugs | module:phoenix-core |
   |  |  org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.shouldVerify(IndexTool$IndexVerifyType, byte[], Scan, Region, IndexMaintainer, IndexVerificationResultRepository, boolean) may expose internal representation by storing an externally mutable object into GlobalIndexRegionScanner.indexRowKeyforReadRepair  At GlobalIndexRegionScanner.java:IndexVerificationResultRepository, boolean) may expose internal representation by storing an externally mutable object into GlobalIndexRegionScanner.indexRowKeyforReadRepair  At GlobalIndexRegionScanner.java:[line 328] |
   | Failed junit tests | phoenix.index.ShouldVerifyTest |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.40 ServerAPI=1.40 base: https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/2/artifact/yetus-general-check/output/Dockerfile |
   | GITHUB PR | https://github.com/apache/phoenix/pull/915 |
   | Optional Tests | dupname asflicense javac javadoc unit spotbugs hbaseanti checkstyle compile |
   | uname | Linux 23e85e95bce0 4.15.0-112-generic #113-Ubuntu SMP Thu Jul 9 23:41:39 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev/phoenix-personality.sh |
   | git revision | 4.x / 264310b |
   | Default Java | Private Build-1.8.0_242-8u242-b08-0ubuntu3~16.04-b08 |
   | checkstyle | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/2/artifact/yetus-general-check/output/diff-checkstyle-phoenix-core.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/2/artifact/yetus-general-check/output/new-spotbugs-phoenix-core.html |
   | unit | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/2/artifact/yetus-general-check/output/patch-unit-phoenix-core.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/2/testReport/ |
   | Max. process+thread count | 442 (vs. ulimit of 30000) |
   | modules | C: phoenix-core U: phoenix-core |
   | Console output | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/2/console |
   | versions | git=2.7.4 maven=3.3.9 spotbugs=4.1.3 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
gokceni commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r504293476



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -1103,13 +1104,21 @@ private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Reg
             if (oldCoproc) {
                 return new IndexerRegionScanner(scanner, region, scan, env, this);
             } else {
-                return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
+                if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) {

Review comment:
       Yes you are right @tkhurana. I meant data table. Don't we remove Indexer and add IndexRegionObserver during upgrade? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] tkhurana commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
tkhurana commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r504288681



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -1103,13 +1104,21 @@ private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Reg
             if (oldCoproc) {
                 return new IndexerRegionScanner(scanner, region, scan, env, this);
             } else {
-                return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
+                if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) {

Review comment:
       @gokceni IndexRegionObserver is only on the data table. Index table has neither Indexer nor IndexRegionObserver




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] kadirozde commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
kadirozde commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r504917195



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
##########
@@ -240,4 +320,1033 @@ protected boolean isColumnIncluded(Cell cell) {
         byte[] qualifier = CellUtil.cloneQualifier(cell);
         return set.contains(qualifier);
     }
+    @VisibleForTesting
+    public boolean shouldVerify(IndexTool.IndexVerifyType verifyType,
+                                byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer,
+                                IndexVerificationResultRepository verificationResultRepository, boolean shouldVerifyCheckDone) throws IOException {
+        this.verifyType = verifyType;
+        this.indexRowKeyforReadRepair = indexRowKey;
+        this.scan = scan;
+        this.region = region;
+        this.indexMaintainer = indexMaintainer;
+        this.verificationResultRepository = verificationResultRepository;
+        this.shouldVerifyCheckDone = shouldVerifyCheckDone;
+        return shouldVerify();
+    }
+
+    protected boolean shouldVerify() throws IOException {
+        // In case of read repair, proceed with rebuild
+        // All other types of rebuilds/verification should be incrementally performed if appropriate param is passed
+        byte[] lastVerifyTimeValue = scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY);
+        Long lastVerifyTime = lastVerifyTimeValue == null ? 0 : Bytes.toLong(lastVerifyTimeValue);
+        if(indexRowKeyforReadRepair != null || lastVerifyTime == 0 || shouldVerifyCheckDone) {
+            return true;
+        }
+
+        IndexToolVerificationResult verificationResultTemp = verificationResultRepository
+                .getVerificationResult(lastVerifyTime, scan, region, indexMaintainer.getIndexTableName()) ;
+        if(verificationResultTemp != null) {
+            verificationResult = verificationResultTemp;
+        }
+        shouldVerifyCheckDone = true;
+        return verificationResultTemp == null;
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    private void closeTables() throws IOException {
+        hTableFactory.shutdown();
+        if (indexHTable != null) {
+            indexHTable.close();
+        }
+        if (dataHTable != null) {
+            dataHTable.close();
+        }
+    }
+    @Override
+    public void close() throws IOException {
+        innerScanner.close();
+        if (indexRowKeyforReadRepair != null) {
+            closeTables();
+            return;
+        }
+        if (verify) {
+            try {
+                if (verificationResultRepository != null) {
+                    verificationResultRepository.logToIndexToolResultTable(verificationResult,
+                            verifyType, region.getRegionInfo().getRegionName(), skipped);
+                }
+            } finally {
+                this.pool.stop("IndexRegionObserverRegionScanner is closing");
+                closeTables();
+                if (verificationResultRepository != null) {
+                    verificationResultRepository.close();
+                }
+                if (verificationOutputRepository != null) {
+                    verificationOutputRepository.close();
+                }
+            }
+        }
+        else {
+            this.pool.stop("IndexRegionObserverRegionScanner is closing");
+            closeTables();
+        }
+    }
+
+    @VisibleForTesting
+    public int setIndexTableTTL(int ttl) {
+        indexTableTTL = ttl;
+        return 0;

Review comment:
       This method is used only for the unit test. @swaroopak, can you answer the question?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] tkhurana commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
tkhurana commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r503665394



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScanner.class);
+
+    public IndexRepairRegionScanner(final RegionScanner innerScanner,
+                                     final Region region,
+                                     final Scan scan,
+                                     final RegionCoprocessorEnvironment env,
+                                     final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException {
+        super(innerScanner, region, scan, env, ungroupedAggregateRegionObserver);
+
+        byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+        dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
+        indexTableTTL = region.getTableDesc().getColumnFamilies()[0].getTimeToLive();
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                     HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration())) {
+            regionEndKeys = connection.getRegionLocator(dataHTable.getName()).getEndKeys();
+        }
+    }
+
+    public void prepareExpectedIndexMutations(Result dataRow, Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
+        Put put = null;
+        Delete del = null;
+        for (Cell cell : dataRow.rawCells()) {
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null) {
+                    put = new Put(CellUtil.cloneRow(cell));
+                }
+                put.add(cell);
+            } else {
+                if (del == null) {
+                    del = new Delete(CellUtil.cloneRow(cell));
+                }
+                del.addDeleteMarker(cell);
+            }
+        }
+        List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        Collections.reverse(indexMutations);
+        for (Mutation mutation : indexMutations) {
+            byte[] indexRowKey = mutation.getRow();
+            List<Mutation> mutationList = expectedIndexMutationMap.get(indexRowKey);
+            if (mutationList == null) {
+                mutationList = new ArrayList<>();
+                mutationList.add(mutation);
+                expectedIndexMutationMap.put(indexRowKey, mutationList);
+            } else {
+                mutationList.add(mutation);
+            }
+        }
+    }
+
+    private void repairIndexRows(Map<byte[], List<Mutation>> indexMutationMap,
+                                 List<Mutation> indexRowsToBeDeleted,
+                                 IndexToolVerificationResult verificationResult) throws IOException {
+        try {
+            int batchSize = 0;
+            List<Mutation> indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (List<Mutation> mutationList : indexMutationMap.values()) {
+                indexUpdates.addAll(mutationList);
+                batchSize += mutationList.size();
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);
+            }
+            batchSize = 0;
+            indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (Mutation mutation : indexRowsToBeDeleted) {
+                indexUpdates.add(mutation);
+                batchSize ++;
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);

Review comment:
       I feel the same logic of checking if the region is closed and then sending the mutations is duplicated in multiple places and we can move it to a function to reduce the code duplication.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
gokceni commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r504279338



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
##########
@@ -240,4 +320,1033 @@ protected boolean isColumnIncluded(Cell cell) {
         byte[] qualifier = CellUtil.cloneQualifier(cell);
         return set.contains(qualifier);
     }
+    @VisibleForTesting
+    public boolean shouldVerify(IndexTool.IndexVerifyType verifyType,
+                                byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer,
+                                IndexVerificationResultRepository verificationResultRepository, boolean shouldVerifyCheckDone) throws IOException {
+        this.verifyType = verifyType;
+        this.indexRowKeyforReadRepair = indexRowKey;
+        this.scan = scan;
+        this.region = region;
+        this.indexMaintainer = indexMaintainer;
+        this.verificationResultRepository = verificationResultRepository;
+        this.shouldVerifyCheckDone = shouldVerifyCheckDone;
+        return shouldVerify();
+    }
+
+    protected boolean shouldVerify() throws IOException {
+        // In case of read repair, proceed with rebuild
+        // All other types of rebuilds/verification should be incrementally performed if appropriate param is passed
+        byte[] lastVerifyTimeValue = scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY);
+        Long lastVerifyTime = lastVerifyTimeValue == null ? 0 : Bytes.toLong(lastVerifyTimeValue);
+        if(indexRowKeyforReadRepair != null || lastVerifyTime == 0 || shouldVerifyCheckDone) {
+            return true;
+        }
+
+        IndexToolVerificationResult verificationResultTemp = verificationResultRepository
+                .getVerificationResult(lastVerifyTime, scan, region, indexMaintainer.getIndexTableName()) ;
+        if(verificationResultTemp != null) {
+            verificationResult = verificationResultTemp;
+        }
+        shouldVerifyCheckDone = true;
+        return verificationResultTemp == null;
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    private void closeTables() throws IOException {
+        hTableFactory.shutdown();
+        if (indexHTable != null) {
+            indexHTable.close();
+        }
+        if (dataHTable != null) {
+            dataHTable.close();
+        }
+    }
+    @Override
+    public void close() throws IOException {
+        innerScanner.close();
+        if (indexRowKeyforReadRepair != null) {
+            closeTables();
+            return;
+        }
+        if (verify) {
+            try {
+                if (verificationResultRepository != null) {
+                    verificationResultRepository.logToIndexToolResultTable(verificationResult,
+                            verifyType, region.getRegionInfo().getRegionName(), skipped);
+                }
+            } finally {
+                this.pool.stop("IndexRegionObserverRegionScanner is closing");
+                closeTables();
+                if (verificationResultRepository != null) {
+                    verificationResultRepository.close();
+                }
+                if (verificationOutputRepository != null) {
+                    verificationOutputRepository.close();
+                }
+            }
+        }
+        else {
+            this.pool.stop("IndexRegionObserverRegionScanner is closing");
+            closeTables();
+        }
+    }
+
+    @VisibleForTesting
+    public int setIndexTableTTL(int ttl) {
+        indexTableTTL = ttl;
+        return 0;

Review comment:
       What is the purpose of always returning 0?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+

Review comment:
       Most of the other classes have small descriptions of what they do. Let's add one here as well.

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
##########
@@ -240,4 +320,1033 @@ protected boolean isColumnIncluded(Cell cell) {
         byte[] qualifier = CellUtil.cloneQualifier(cell);
         return set.contains(qualifier);
     }
+    @VisibleForTesting
+    public boolean shouldVerify(IndexTool.IndexVerifyType verifyType,
+                                byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer,
+                                IndexVerificationResultRepository verificationResultRepository, boolean shouldVerifyCheckDone) throws IOException {
+        this.verifyType = verifyType;
+        this.indexRowKeyforReadRepair = indexRowKey;
+        this.scan = scan;
+        this.region = region;
+        this.indexMaintainer = indexMaintainer;
+        this.verificationResultRepository = verificationResultRepository;
+        this.shouldVerifyCheckDone = shouldVerifyCheckDone;
+        return shouldVerify();
+    }
+
+    protected boolean shouldVerify() throws IOException {
+        // In case of read repair, proceed with rebuild
+        // All other types of rebuilds/verification should be incrementally performed if appropriate param is passed
+        byte[] lastVerifyTimeValue = scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY);
+        Long lastVerifyTime = lastVerifyTimeValue == null ? 0 : Bytes.toLong(lastVerifyTimeValue);
+        if(indexRowKeyforReadRepair != null || lastVerifyTime == 0 || shouldVerifyCheckDone) {

Review comment:
       Once this function is called, shouldVerifyCheckDone is set to true and from that point on, it will always be true. 
   Even if we want incremental, we will always do verify then why do we check lastVerifyTime? If lastVerifyTime is not 0, then shouldVerify check will not be set to true.
   Can we have just 1 global variable to decide if we should verify than multiple to make this easier?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
##########
@@ -240,4 +320,1033 @@ protected boolean isColumnIncluded(Cell cell) {
         byte[] qualifier = CellUtil.cloneQualifier(cell);
         return set.contains(qualifier);
     }
+    @VisibleForTesting
+    public boolean shouldVerify(IndexTool.IndexVerifyType verifyType,
+                                byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer,
+                                IndexVerificationResultRepository verificationResultRepository, boolean shouldVerifyCheckDone) throws IOException {
+        this.verifyType = verifyType;
+        this.indexRowKeyforReadRepair = indexRowKey;
+        this.scan = scan;
+        this.region = region;
+        this.indexMaintainer = indexMaintainer;
+        this.verificationResultRepository = verificationResultRepository;
+        this.shouldVerifyCheckDone = shouldVerifyCheckDone;
+        return shouldVerify();
+    }
+
+    protected boolean shouldVerify() throws IOException {
+        // In case of read repair, proceed with rebuild
+        // All other types of rebuilds/verification should be incrementally performed if appropriate param is passed
+        byte[] lastVerifyTimeValue = scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY);
+        Long lastVerifyTime = lastVerifyTimeValue == null ? 0 : Bytes.toLong(lastVerifyTimeValue);
+        if(indexRowKeyforReadRepair != null || lastVerifyTime == 0 || shouldVerifyCheckDone) {
+            return true;
+        }
+
+        IndexToolVerificationResult verificationResultTemp = verificationResultRepository
+                .getVerificationResult(lastVerifyTime, scan, region, indexMaintainer.getIndexTableName()) ;
+        if(verificationResultTemp != null) {
+            verificationResult = verificationResultTemp;
+        }
+        shouldVerifyCheckDone = true;
+        return verificationResultTemp == null;
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    private void closeTables() throws IOException {
+        hTableFactory.shutdown();
+        if (indexHTable != null) {
+            indexHTable.close();
+        }
+        if (dataHTable != null) {
+            dataHTable.close();
+        }
+    }
+    @Override
+    public void close() throws IOException {
+        innerScanner.close();
+        if (indexRowKeyforReadRepair != null) {
+            closeTables();
+            return;
+        }
+        if (verify) {
+            try {
+                if (verificationResultRepository != null) {
+                    verificationResultRepository.logToIndexToolResultTable(verificationResult,
+                            verifyType, region.getRegionInfo().getRegionName(), skipped);
+                }
+            } finally {
+                this.pool.stop("IndexRegionObserverRegionScanner is closing");
+                closeTables();
+                if (verificationResultRepository != null) {
+                    verificationResultRepository.close();
+                }
+                if (verificationOutputRepository != null) {
+                    verificationOutputRepository.close();
+                }
+            }
+        }
+        else {
+            this.pool.stop("IndexRegionObserverRegionScanner is closing");
+            closeTables();
+        }
+    }
+
+    @VisibleForTesting
+    public int setIndexTableTTL(int ttl) {
+        indexTableTTL = ttl;
+        return 0;
+    }
+
+    @VisibleForTesting
+    public int setIndexMaintainer(IndexMaintainer indexMaintainer) {
+        this.indexMaintainer = indexMaintainer;
+        return 0;
+    }
+
+    @VisibleForTesting
+    public long setMaxLookBackInMills(long maxLookBackInMills) {
+        this.maxLookBackInMills = maxLookBackInMills;
+        return 0;
+    }
+
+    public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+                                          String errorMsg, boolean isBeforeRebuild,
+                                          IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException {
+        logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null,
+                null, isBeforeRebuild, errorType);
+    }
+
+    @VisibleForTesting
+    public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+                                          String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild,
+                                          IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException {
+        ungroupedAggregateRegionObserver.checkForRegionClosing();
+        verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
+                errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(),
+                region.getRegionInfo().getTable().getName(), isBeforeRebuild, errorType);
+    }
+
+    private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
+        List<Cell> cellList = m.getFamilyCellMap().get(family);
+        if (cellList == null) {
+            return null;
+        }
+        for (Cell cell : cellList) {
+            if (CellUtil.matchingQualifier(cell, qualifier)) {
+                return cell;
+            }
+        }
+        return null;
+    }
+
+    private void logMismatch(Mutation expected, Mutation actual, int iteration, IndexToolVerificationResult.PhaseResult verificationPhaseResult, boolean isBeforeRebuild) throws IOException {
+        if (getTimestamp(expected) != getTimestamp(actual)) {
+            String errorMsg = "Not matching timestamp";
+            byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+            logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
+                    errorMsg, null, null, isBeforeRebuild, INVALID_ROW);
+            return;
+        }
+        int expectedCellCount = 0;
+        for (List<Cell> cells : expected.getFamilyCellMap().values()) {
+            if (cells == null) {
+                continue;
+            }
+            for (Cell expectedCell : cells) {
+                expectedCellCount++;
+                byte[] family = CellUtil.cloneFamily(expectedCell);
+                byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
+                Cell actualCell = getCell(actual, family, qualifier);
+                if (actualCell == null ||
+                        !CellUtil.matchingType(expectedCell, actualCell)) {
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+                    String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+                    logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected),
+                            getTimestamp(actual), errorMsg, isBeforeRebuild, INVALID_ROW);
+                    verificationPhaseResult.setIndexHasMissingCellsCount(verificationPhaseResult.getIndexHasMissingCellsCount() + 1);
+                    return;
+                }
+                if (!CellUtil.matchingValue(actualCell, expectedCell)) {
+                    String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);

Review comment:
       Family qualifier will not be super useful if we do 5928

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScanner.class);
+
+    public IndexRepairRegionScanner(final RegionScanner innerScanner,
+                                     final Region region,
+                                     final Scan scan,
+                                     final RegionCoprocessorEnvironment env,
+                                     final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException {
+        super(innerScanner, region, scan, env, ungroupedAggregateRegionObserver);
+
+        byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+        dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
+        indexTableTTL = region.getTableDesc().getColumnFamilies()[0].getTimeToLive();
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                     HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration())) {
+            regionEndKeys = connection.getRegionLocator(dataHTable.getName()).getEndKeys();
+        }
+    }
+
+    public void prepareExpectedIndexMutations(Result dataRow, Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
+        Put put = null;
+        Delete del = null;
+        for (Cell cell : dataRow.rawCells()) {
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null) {
+                    put = new Put(CellUtil.cloneRow(cell));
+                }
+                put.add(cell);
+            } else {
+                if (del == null) {
+                    del = new Delete(CellUtil.cloneRow(cell));
+                }
+                del.addDeleteMarker(cell);
+            }
+        }
+        List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        Collections.reverse(indexMutations);
+        for (Mutation mutation : indexMutations) {
+            byte[] indexRowKey = mutation.getRow();
+            List<Mutation> mutationList = expectedIndexMutationMap.get(indexRowKey);
+            if (mutationList == null) {
+                mutationList = new ArrayList<>();
+                mutationList.add(mutation);
+                expectedIndexMutationMap.put(indexRowKey, mutationList);
+            } else {
+                mutationList.add(mutation);
+            }
+        }
+    }
+
+    private void repairIndexRows(Map<byte[], List<Mutation>> indexMutationMap,
+                                 List<Mutation> indexRowsToBeDeleted,
+                                 IndexToolVerificationResult verificationResult) throws IOException {
+        try {
+            int batchSize = 0;
+            List<Mutation> indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (List<Mutation> mutationList : indexMutationMap.values()) {
+                indexUpdates.addAll(mutationList);
+                batchSize += mutationList.size();
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);
+            }
+            batchSize = 0;
+            indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (Mutation mutation : indexRowsToBeDeleted) {
+                indexUpdates.add(mutation);
+                batchSize ++;
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);

Review comment:
       +1

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -1103,13 +1104,21 @@ private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Reg
             if (oldCoproc) {
                 return new IndexerRegionScanner(scanner, region, scan, env, this);
             } else {
-                return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
+                if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) {

Review comment:
       Don't we have IndexRegionObserver for new design all the time? What is the case when the index doesn't have both Indexer and IndexRegionObserver?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] stoty commented on pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #915:
URL: https://github.com/apache/phoenix/pull/915#issuecomment-710628700


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   5m 28s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 2 new or modified test files.  |
   ||| _ 4.x Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |  32m 29s |  4.x passed  |
   | +1 :green_heart: |  compile  |   1m  1s |  4.x passed  |
   | +1 :green_heart: |  checkstyle  |   0m 59s |  4.x passed  |
   | +1 :green_heart: |  javadoc  |   0m 47s |  4.x passed  |
   | +0 :ok: |  spotbugs  |   3m  9s |  phoenix-core in 4.x has 956 extant spotbugs warnings.  |
   ||| _ Patch Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |  28m 50s |  the patch passed  |
   | +1 :green_heart: |  compile  |   0m 58s |  the patch passed  |
   | +1 :green_heart: |  javac  |   0m 58s |  the patch passed  |
   | -1 :x: |  checkstyle  |   1m  3s |  phoenix-core: The patch generated 582 new + 1420 unchanged - 386 fixed = 2002 total (was 1806)  |
   | +1 :green_heart: |  whitespace  |   0m  0s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  javadoc  |   0m 46s |  the patch passed  |
   | -1 :x: |  spotbugs  |   3m 23s |  phoenix-core generated 1 new + 955 unchanged - 1 fixed = 956 total (was 956)  |
   ||| _ Other Tests _ |
   | -1 :x: |  unit  |   1m 25s |  phoenix-core in the patch failed.  |
   | +1 :green_heart: |  asflicense  |   0m  9s |  The patch does not generate ASF License warnings.  |
   |  |   |  81m 51s |   |
   
   
   | Reason | Tests |
   |-------:|:------|
   | FindBugs | module:phoenix-core |
   |  |  org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.shouldVerify(IndexTool$IndexVerifyType, byte[], Scan, Region, IndexMaintainer, IndexVerificationResultRepository, boolean) may expose internal representation by storing an externally mutable object into GlobalIndexRegionScanner.indexRowKeyforReadRepair  At GlobalIndexRegionScanner.java:IndexVerificationResultRepository, boolean) may expose internal representation by storing an externally mutable object into GlobalIndexRegionScanner.indexRowKeyforReadRepair  At GlobalIndexRegionScanner.java:[line 333] |
   | Failed junit tests | phoenix.index.ShouldVerifyTest |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.40 ServerAPI=1.40 base: https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/3/artifact/yetus-general-check/output/Dockerfile |
   | GITHUB PR | https://github.com/apache/phoenix/pull/915 |
   | Optional Tests | dupname asflicense javac javadoc unit spotbugs hbaseanti checkstyle compile |
   | uname | Linux 1e8a21fbdd5a 4.15.0-112-generic #113-Ubuntu SMP Thu Jul 9 23:41:39 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev/phoenix-personality.sh |
   | git revision | 4.x / 2359f54 |
   | Default Java | Private Build-1.8.0_242-8u242-b08-0ubuntu3~16.04-b08 |
   | checkstyle | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/3/artifact/yetus-general-check/output/diff-checkstyle-phoenix-core.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/3/artifact/yetus-general-check/output/new-spotbugs-phoenix-core.html |
   | unit | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/3/artifact/yetus-general-check/output/patch-unit-phoenix-core.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/3/testReport/ |
   | Max. process+thread count | 447 (vs. ulimit of 30000) |
   | modules | C: phoenix-core U: phoenix-core |
   | Console output | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/3/console |
   | versions | git=2.7.4 maven=3.3.9 spotbugs=4.1.3 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] tkhurana commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
tkhurana commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r504338861



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScanner.class);
+
+    public IndexRepairRegionScanner(final RegionScanner innerScanner,
+                                     final Region region,
+                                     final Scan scan,
+                                     final RegionCoprocessorEnvironment env,
+                                     final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException {
+        super(innerScanner, region, scan, env, ungroupedAggregateRegionObserver);
+
+        byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+        dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
+        indexTableTTL = region.getTableDesc().getColumnFamilies()[0].getTimeToLive();
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                     HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration())) {
+            regionEndKeys = connection.getRegionLocator(dataHTable.getName()).getEndKeys();
+        }
+    }
+
+    public void prepareExpectedIndexMutations(Result dataRow, Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
+        Put put = null;
+        Delete del = null;
+        for (Cell cell : dataRow.rawCells()) {
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null) {
+                    put = new Put(CellUtil.cloneRow(cell));
+                }
+                put.add(cell);
+            } else {
+                if (del == null) {
+                    del = new Delete(CellUtil.cloneRow(cell));
+                }
+                del.addDeleteMarker(cell);
+            }
+        }
+        List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        Collections.reverse(indexMutations);
+        for (Mutation mutation : indexMutations) {
+            byte[] indexRowKey = mutation.getRow();
+            List<Mutation> mutationList = expectedIndexMutationMap.get(indexRowKey);
+            if (mutationList == null) {
+                mutationList = new ArrayList<>();
+                mutationList.add(mutation);
+                expectedIndexMutationMap.put(indexRowKey, mutationList);
+            } else {
+                mutationList.add(mutation);
+            }
+        }
+    }
+
+    private void repairIndexRows(Map<byte[], List<Mutation>> indexMutationMap,
+                                 List<Mutation> indexRowsToBeDeleted,
+                                 IndexToolVerificationResult verificationResult) throws IOException {
+        try {
+            int batchSize = 0;
+            List<Mutation> indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (List<Mutation> mutationList : indexMutationMap.values()) {
+                indexUpdates.addAll(mutationList);
+                batchSize += mutationList.size();
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);
+            }
+            batchSize = 0;
+            indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (Mutation mutation : indexRowsToBeDeleted) {
+                indexUpdates.add(mutation);
+                batchSize ++;
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);
+            }
+            if (verify) {
+                verificationResult.setRebuiltIndexRowCount(verificationResult.getRebuiltIndexRowCount() + indexMutationMap.size());
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+        }
+    }
+
+    private Map<byte[], List<Mutation>> populateExpectedIndexMutationMap(Set<byte[]> dataRowKeys) throws IOException {
+        Map<byte[], List<Mutation>> expectedIndexMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+        List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
+        for (byte[] indexKey: dataRowKeys) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey));
+        }
+        ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+        Scan dataScan = new Scan();
+        dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+        scanRanges.initializeScan(dataScan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        dataScan.setFilter(new SkipScanFilter(skipScanFilter, true));
+        dataScan.setRaw(true);
+        dataScan.setMaxVersions();
+        dataScan.setCacheBlocks(false);
+        try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) {
+            for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                prepareExpectedIndexMutations(result, expectedIndexMutationMap);
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(dataHTable.getName().toString(), t);
+        }
+        return expectedIndexMutationMap;
+    }
+
+    private Map<byte[], List<Mutation>> populateActualIndexMutationMap(Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
+        Map<byte[], List<Mutation>> actualIndexMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+        Scan indexScan = prepareIndexScan(expectedIndexMutationMap);
+        try (RegionScanner regionScanner = region.getScanner(indexScan)) {
+            do {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                List<Cell> row = new ArrayList<Cell>();
+                hasMore = regionScanner.nextRaw(row);
+                if (!row.isEmpty()) {
+                    populateIndexMutationFromIndexRow(row, actualIndexMutationMap);
+                }
+            } while (hasMore);
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+        }
+        return actualIndexMutationMap;
+    }
+
+    private void repairAndOrVerifyIndexRows(Set<byte[]> dataRowKeys,
+                                            Map<byte[], List<Mutation>> actualIndexMutationMap,
+                                            IndexToolVerificationResult verificationResult) throws IOException {
+        List<Mutation> indexRowsToBeDeleted = new ArrayList<>();
+        Map<byte[], List<Mutation>> expectedIndexMutationMap = populateExpectedIndexMutationMap(dataRowKeys);
+        if (verifyType == IndexTool.IndexVerifyType.NONE) {
+            repairIndexRows(expectedIndexMutationMap, indexRowsToBeDeleted, verificationResult);
+            return;
+        }
+        if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+            verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, Collections.EMPTY_LIST, verificationResult.getBefore(), true);
+            return;
+        }
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE) {
+            verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true);
+            if (!expectedIndexMutationMap.isEmpty() || !indexRowsToBeDeleted.isEmpty()) {
+                repairIndexRows(expectedIndexMutationMap, indexRowsToBeDeleted, verificationResult);
+            }
+            return;
+        }
+        if (verifyType == IndexTool.IndexVerifyType.AFTER) {

Review comment:
       The AFTER option will not remove the extra verified rows in the index table. Same with the NONE option. Does it make sense to have these options when using the index table as the source ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] stoty commented on pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #915:
URL: https://github.com/apache/phoenix/pull/915#issuecomment-705327370


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   6m 24s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 2 new or modified test files.  |
   ||| _ 4.x Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |   8m 39s |  4.x passed  |
   | +1 :green_heart: |  compile  |   0m 53s |  4.x passed  |
   | +1 :green_heart: |  checkstyle  |   1m 13s |  4.x passed  |
   | +1 :green_heart: |  javadoc  |   0m 45s |  4.x passed  |
   | +0 :ok: |  spotbugs  |   2m 54s |  phoenix-core in 4.x has 957 extant spotbugs warnings.  |
   ||| _ Patch Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |   4m 54s |  the patch passed  |
   | +1 :green_heart: |  compile  |   0m 56s |  the patch passed  |
   | +1 :green_heart: |  javac  |   0m 56s |  the patch passed  |
   | -1 :x: |  checkstyle  |   1m 21s |  phoenix-core: The patch generated 542 new + 1433 unchanged - 372 fixed = 1975 total (was 1805)  |
   | +1 :green_heart: |  whitespace  |   0m  0s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  javadoc  |   0m 41s |  the patch passed  |
   | -1 :x: |  spotbugs  |   3m  3s |  phoenix-core generated 1 new + 956 unchanged - 1 fixed = 957 total (was 957)  |
   ||| _ Other Tests _ |
   | -1 :x: |  unit  |   1m 12s |  phoenix-core in the patch failed.  |
   | +1 :green_heart: |  asflicense  |   0m 10s |  The patch does not generate ASF License warnings.  |
   |  |   |  34m 35s |   |
   
   
   | Reason | Tests |
   |-------:|:------|
   | FindBugs | module:phoenix-core |
   |  |  org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.shouldVerify(IndexTool$IndexVerifyType, byte[], Scan, Region, IndexMaintainer, IndexVerificationResultRepository, boolean) may expose internal representation by storing an externally mutable object into GlobalIndexRegionScanner.indexRowKeyforReadRepair  At GlobalIndexRegionScanner.java:IndexVerificationResultRepository, boolean) may expose internal representation by storing an externally mutable object into GlobalIndexRegionScanner.indexRowKeyforReadRepair  At GlobalIndexRegionScanner.java:[line 328] |
   | Failed junit tests | phoenix.index.ShouldVerifyTest |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.40 ServerAPI=1.40 base: https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/artifact/yetus-general-check/output/Dockerfile |
   | GITHUB PR | https://github.com/apache/phoenix/pull/915 |
   | Optional Tests | dupname asflicense javac javadoc unit spotbugs hbaseanti checkstyle compile |
   | uname | Linux b294a7ca6906 4.15.0-112-generic #113-Ubuntu SMP Thu Jul 9 23:41:39 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev/phoenix-personality.sh |
   | git revision | 4.x / 3008ca9 |
   | Default Java | Private Build-1.8.0_242-8u242-b08-0ubuntu3~16.04-b08 |
   | checkstyle | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/artifact/yetus-general-check/output/diff-checkstyle-phoenix-core.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/artifact/yetus-general-check/output/new-spotbugs-phoenix-core.html |
   | unit | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/artifact/yetus-general-check/output/patch-unit-phoenix-core.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/testReport/ |
   | Max. process+thread count | 487 (vs. ulimit of 30000) |
   | modules | C: phoenix-core U: phoenix-core |
   | Console output | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-PreCommit-GitHub-PR/job/PR-915/1/console |
   | versions | git=2.7.4 maven=3.3.9 spotbugs=4.1.3 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] kadirozde commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
kadirozde commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r505789656



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScanner.class);
+
+    public IndexRepairRegionScanner(final RegionScanner innerScanner,
+                                     final Region region,
+                                     final Scan scan,
+                                     final RegionCoprocessorEnvironment env,
+                                     final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException {
+        super(innerScanner, region, scan, env, ungroupedAggregateRegionObserver);
+
+        byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+        dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
+        indexTableTTL = region.getTableDesc().getColumnFamilies()[0].getTimeToLive();
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                     HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration())) {
+            regionEndKeys = connection.getRegionLocator(dataHTable.getName()).getEndKeys();
+        }
+    }
+
+    public void prepareExpectedIndexMutations(Result dataRow, Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
+        Put put = null;
+        Delete del = null;
+        for (Cell cell : dataRow.rawCells()) {
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null) {
+                    put = new Put(CellUtil.cloneRow(cell));
+                }
+                put.add(cell);
+            } else {
+                if (del == null) {
+                    del = new Delete(CellUtil.cloneRow(cell));
+                }
+                del.addDeleteMarker(cell);
+            }
+        }
+        List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        Collections.reverse(indexMutations);
+        for (Mutation mutation : indexMutations) {
+            byte[] indexRowKey = mutation.getRow();
+            List<Mutation> mutationList = expectedIndexMutationMap.get(indexRowKey);
+            if (mutationList == null) {
+                mutationList = new ArrayList<>();
+                mutationList.add(mutation);
+                expectedIndexMutationMap.put(indexRowKey, mutationList);
+            } else {
+                mutationList.add(mutation);
+            }
+        }
+    }
+
+    private void repairIndexRows(Map<byte[], List<Mutation>> indexMutationMap,
+                                 List<Mutation> indexRowsToBeDeleted,
+                                 IndexToolVerificationResult verificationResult) throws IOException {
+        try {
+            int batchSize = 0;
+            List<Mutation> indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (List<Mutation> mutationList : indexMutationMap.values()) {
+                indexUpdates.addAll(mutationList);
+                batchSize += mutationList.size();
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);
+            }
+            batchSize = 0;
+            indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (Mutation mutation : indexRowsToBeDeleted) {
+                indexUpdates.add(mutation);
+                batchSize ++;
+                if (batchSize >= maxBatchSize) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                            HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                region.batchMutate(indexUpdates.toArray(new Mutation[indexUpdates.size()]),
+                        HConstants.NO_NONCE, HConstants.NO_NONCE);
+            }
+            if (verify) {
+                verificationResult.setRebuiltIndexRowCount(verificationResult.getRebuiltIndexRowCount() + indexMutationMap.size());
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+        }
+    }
+
+    private Map<byte[], List<Mutation>> populateExpectedIndexMutationMap(Set<byte[]> dataRowKeys) throws IOException {
+        Map<byte[], List<Mutation>> expectedIndexMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+        List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
+        for (byte[] indexKey: dataRowKeys) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey));
+        }
+        ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+        Scan dataScan = new Scan();
+        dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+        scanRanges.initializeScan(dataScan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        dataScan.setFilter(new SkipScanFilter(skipScanFilter, true));
+        dataScan.setRaw(true);
+        dataScan.setMaxVersions();
+        dataScan.setCacheBlocks(false);
+        try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) {
+            for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                prepareExpectedIndexMutations(result, expectedIndexMutationMap);
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(dataHTable.getName().toString(), t);
+        }
+        return expectedIndexMutationMap;
+    }
+
+    private Map<byte[], List<Mutation>> populateActualIndexMutationMap(Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
+        Map<byte[], List<Mutation>> actualIndexMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+        Scan indexScan = prepareIndexScan(expectedIndexMutationMap);
+        try (RegionScanner regionScanner = region.getScanner(indexScan)) {
+            do {
+                ungroupedAggregateRegionObserver.checkForRegionClosing();
+                List<Cell> row = new ArrayList<Cell>();
+                hasMore = regionScanner.nextRaw(row);
+                if (!row.isEmpty()) {
+                    populateIndexMutationFromIndexRow(row, actualIndexMutationMap);
+                }
+            } while (hasMore);
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+        }
+        return actualIndexMutationMap;
+    }
+
+    private void repairAndOrVerifyIndexRows(Set<byte[]> dataRowKeys,
+                                            Map<byte[], List<Mutation>> actualIndexMutationMap,
+                                            IndexToolVerificationResult verificationResult) throws IOException {
+        List<Mutation> indexRowsToBeDeleted = new ArrayList<>();
+        Map<byte[], List<Mutation>> expectedIndexMutationMap = populateExpectedIndexMutationMap(dataRowKeys);
+        if (verifyType == IndexTool.IndexVerifyType.NONE) {
+            repairIndexRows(expectedIndexMutationMap, indexRowsToBeDeleted, verificationResult);
+            return;
+        }
+        if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+            verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, Collections.EMPTY_LIST, verificationResult.getBefore(), true);
+            return;
+        }
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE) {
+            verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true);
+            if (!expectedIndexMutationMap.isEmpty() || !indexRowsToBeDeleted.isEmpty()) {
+                repairIndexRows(expectedIndexMutationMap, indexRowsToBeDeleted, verificationResult);
+            }
+            return;
+        }
+        if (verifyType == IndexTool.IndexVerifyType.AFTER) {

Review comment:
       I will leave this decision to be made within IndexTool. I think we can still allow these options and state in the help text for IndexTook that they do not remove the stale index rows.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] kadirozde commented on a change in pull request #915: PHOENIX-6181 IndexRepairRegionScanner to verify and repair every glob…

Posted by GitBox <gi...@apache.org>.
kadirozde commented on a change in pull request #915:
URL: https://github.com/apache/phoenix/pull/915#discussion_r504924842



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -1103,13 +1104,21 @@ private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Reg
             if (oldCoproc) {
                 return new IndexerRegionScanner(scanner, region, scan, env, this);
             } else {
-                return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
+                if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) {

Review comment:
       We have not removed Indexer yet. Regarding the PTable comment, this is a server side code and in general we do not want to access the syscat on the server side for performance reasons mainly. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org