You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/10/18 05:06:28 UTC

[phoenix] branch 4.x updated: PHOENIX-6181 Addendum

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 4fc394c  PHOENIX-6181 Addendum
4fc394c is described below

commit 4fc394c3d9c92cbefa20800dd642df20524933a4
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Fri Oct 16 20:11:53 2020 -0700

    PHOENIX-6181 Addendum
---
 .../end2end/ConcurrentMutationsExtendedIT.java     |  6 ----
 .../coprocessor/GlobalIndexRegionScanner.java      |  5 +---
 .../phoenix/coprocessor/IndexerRegionScanner.java  |  5 ----
 .../UngroupedAggregateRegionObserver.java          | 35 +++++++++++-----------
 4 files changed, 18 insertions(+), 33 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 0a52c66..c2e0dc4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
@@ -33,7 +32,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.*;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -45,7 +43,6 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
@@ -53,15 +50,12 @@ import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFT
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_OLD_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
 import static org.junit.Assert.*;
 
 @RunWith(RunUntilFailure.class) @Category(NeedsOwnMiniClusterTest.class)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index e1eb9ff..5005339 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -66,7 +66,6 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,7 +114,6 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
     public static final String PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS =
             "phoenix.index.mr.log.beyond.max.lookback.errors";
     public static final boolean DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false;
-    private static boolean ignoreIndexRebuildForTesting  = false;
     protected final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
 
     protected IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.NONE;
@@ -986,8 +984,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
                 indexUpdates.addAll(mutationList);
                 batchSize += mutationList.size();
                 if (batchSize >= maxBatchSize) {
-                    ungroupedAggregateRegionObserver.checkForRegionClosing();
-                    indexHTable.batch(indexUpdates);
+                    commitBatch(indexUpdates);
                     batchSize = 0;
                     indexUpdates = new ArrayList<Mutation>(maxBatchSize);
                 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index 4181aca..7f61be3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -35,11 +35,8 @@ import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
-
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -65,7 +62,6 @@ import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
 import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
 import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
-
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.mapreduce.index.IndexTool;
@@ -87,7 +83,6 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner {
     protected Map<byte[], Put> indexKeyToDataPutMap;
     protected UngroupedAggregateRegionObserver.MutationList mutations;
     private boolean partialRebuild = false;
-    private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
 
     IndexerRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
                           final RegionCoprocessorEnvironment env,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b3cd21b..7b66364 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -99,6 +99,7 @@ import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -1072,6 +1073,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
+    private RegionScanner getRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
+                                           final RegionCoprocessorEnvironment env, final boolean oldCoproc)
+            throws IOException {
+        if (oldCoproc) {
+            return new IndexerRegionScanner(innerScanner, region, scan, env, this);
+        } else {
+            if (region.getTableDesc().hasCoprocessor(GlobalIndexChecker.class.getCanonicalName())) {
+                return new IndexRepairRegionScanner(innerScanner, region, scan, env, this);
+            } else {
+                return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
+            }
+        }
+    }
+
     private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
                                          final RegionCoprocessorEnvironment env) throws IOException {
         boolean oldCoproc = region.getTableDesc().hasCoprocessor(Indexer.class.getCanonicalName());
@@ -1101,25 +1116,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             }
             innerScanner.close();
             RegionScanner scanner = region.getScanner(rawScan);
-            if (oldCoproc) {
-                return new IndexerRegionScanner(scanner, region, scan, env, this);
-            } else {
-                if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) {
-                    return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
-                } else {
-                    return new IndexRepairRegionScanner(scanner, region, scan, env, this);
-                }
-            }
-        }
-        if (oldCoproc) {
-            return new IndexerRegionScanner(innerScanner, region, scan, env, this);
-        } else {
-            if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) {
-                return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
-            } else {
-                return new IndexRepairRegionScanner(innerScanner, region, scan, env, this);
-            }
+            return getRegionScanner(scanner, region, scan, env, oldCoproc);
         }
+        return getRegionScanner(innerScanner, region, scan, env, oldCoproc);
     }
     
     private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,