You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/10/18 05:06:28 UTC
[phoenix] branch 4.x updated: PHOENIX-6181 Addendum
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 4fc394c PHOENIX-6181 Addendum
4fc394c is described below
commit 4fc394c3d9c92cbefa20800dd642df20524933a4
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Fri Oct 16 20:11:53 2020 -0700
PHOENIX-6181 Addendum
---
.../end2end/ConcurrentMutationsExtendedIT.java | 6 ----
.../coprocessor/GlobalIndexRegionScanner.java | 5 +---
.../phoenix/coprocessor/IndexerRegionScanner.java | 5 ----
.../UngroupedAggregateRegionObserver.java | 35 +++++++++++-----------
4 files changed, 18 insertions(+), 33 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 0a52c66..c2e0dc4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
@@ -33,7 +32,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.*;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -45,7 +43,6 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
@@ -53,15 +50,12 @@ import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFT
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_OLD_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
import static org.junit.Assert.*;
@RunWith(RunUntilFailure.class) @Category(NeedsOwnMiniClusterTest.class)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index e1eb9ff..5005339 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -66,7 +66,6 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,7 +114,6 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
public static final String PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS =
"phoenix.index.mr.log.beyond.max.lookback.errors";
public static final boolean DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false;
- private static boolean ignoreIndexRebuildForTesting = false;
protected final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
protected IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.NONE;
@@ -986,8 +984,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
indexUpdates.addAll(mutationList);
batchSize += mutationList.size();
if (batchSize >= maxBatchSize) {
- ungroupedAggregateRegionObserver.checkForRegionClosing();
- indexHTable.batch(indexUpdates);
+ commitBatch(indexUpdates);
batchSize = 0;
indexUpdates = new ArrayList<Mutation>(maxBatchSize);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index 4181aca..7f61be3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -35,11 +35,8 @@ import java.util.concurrent.Future;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
-
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
@@ -65,7 +62,6 @@ import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
-
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.mapreduce.index.IndexTool;
@@ -87,7 +83,6 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner {
protected Map<byte[], Put> indexKeyToDataPutMap;
protected UngroupedAggregateRegionObserver.MutationList mutations;
private boolean partialRebuild = false;
- private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
IndexerRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b3cd21b..7b66364 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -99,6 +99,7 @@ import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -1072,6 +1073,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
+ private RegionScanner getRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
+ final RegionCoprocessorEnvironment env, final boolean oldCoproc)
+ throws IOException {
+ if (oldCoproc) {
+ return new IndexerRegionScanner(innerScanner, region, scan, env, this);
+ } else {
+ if (region.getTableDesc().hasCoprocessor(GlobalIndexChecker.class.getCanonicalName())) {
+ return new IndexRepairRegionScanner(innerScanner, region, scan, env, this);
+ } else {
+ return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
+ }
+ }
+ }
+
private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env) throws IOException {
boolean oldCoproc = region.getTableDesc().hasCoprocessor(Indexer.class.getCanonicalName());
@@ -1101,25 +1116,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
innerScanner.close();
RegionScanner scanner = region.getScanner(rawScan);
- if (oldCoproc) {
- return new IndexerRegionScanner(scanner, region, scan, env, this);
- } else {
- if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) {
- return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
- } else {
- return new IndexRepairRegionScanner(scanner, region, scan, env, this);
- }
- }
- }
- if (oldCoproc) {
- return new IndexerRegionScanner(innerScanner, region, scan, env, this);
- } else {
- if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) {
- return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
- } else {
- return new IndexRepairRegionScanner(innerScanner, region, scan, env, this);
- }
+ return getRegionScanner(scanner, region, scan, env, oldCoproc);
}
+ return getRegionScanner(innerScanner, region, scan, env, oldCoproc);
}
private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,