You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/04/11 20:39:36 UTC
[2/2] phoenix git commit: PHOENIX-4366 Rebuilding a local index fails
sometimes
PHOENIX-4366 Rebuilding a local index fails sometimes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9fa8058c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9fa8058c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9fa8058c
Branch: refs/heads/4.x-HBase-1.3
Commit: 9fa8058c60172d4d910c768ba6511fb330edba0b
Parents: 75cdb4e
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Apr 11 13:37:45 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Apr 11 13:37:45 2018 -0700
----------------------------------------------------------------------
.../coprocessor/BaseScannerRegionObserver.java | 7 +----
.../GroupedAggregateRegionObserver.java | 4 +++
.../phoenix/coprocessor/ScanRegionObserver.java | 4 ++-
.../UngroupedAggregateRegionObserver.java | 7 ++++-
.../NonAggregateRegionScannerFactory.java | 29 +++++++++-----------
.../phoenix/iterate/RegionScannerFactory.java | 4 +--
.../apache/phoenix/iterate/SnapshotScanner.java | 6 +---
7 files changed, 30 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9fa8058c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 4a2c465..110a4ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -140,8 +140,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
/** Exposed for testing */
public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server";
- protected QualifierEncodingScheme encodingScheme;
- protected boolean useNewValueColumnQualifier;
@Override
public void start(CoprocessorEnvironment e) throws IOException {
@@ -212,8 +210,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
// start exclusive and the stop inclusive.
ScanUtil.setupReverseScan(scan);
}
- this.encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
- this.useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
return s;
}
@@ -351,8 +347,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
final byte[][] viewConstants, final TupleProjector projector,
final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) {
- RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment(),
- useNewValueColumnQualifier, encodingScheme);
+ RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment());
return regionScannerFactory.getWrappedScanner(c.getEnvironment(), s, null, null, offset, scan, dataColumns, tupleProjector,
dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9fa8058c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 67cc114..201bcec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
@@ -110,6 +111,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
keyOrdered = true;
}
int offset = 0;
+ boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
if (ScanUtil.isLocalIndex(scan)) {
/*
* For local indexes, we need to set an offset on row key expressions to skip
@@ -395,6 +397,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
+ final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
GroupByCache groupByCache =
GroupByCacheFactory.INSTANCE.newCache(
@@ -466,6 +469,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
}
final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
+ final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
return new BaseRegionScanner(scanner) {
private long rowCount = 0;
private ImmutableBytesPtr currentKey = null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9fa8058c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index b006ef6..2d9cd4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EncodedColumnsUtil;
/**
*
@@ -68,7 +70,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable {
- NonAggregateRegionScannerFactory nonAggregateROUtil = new NonAggregateRegionScannerFactory(c.getEnvironment(), useNewValueColumnQualifier, encodingScheme);
+ NonAggregateRegionScannerFactory nonAggregateROUtil = new NonAggregateRegionScannerFactory(c.getEnvironment());
return nonAggregateROUtil.getRegionScanner(scan, s);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9fa8058c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 27d3880..de57772 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -376,7 +376,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
env, region.getRegionInfo().getTable().getNameAsString(), ts,
gp_width_bytes, gp_per_region_bytes);
return collectStats(s, statsCollector, region, scan, env.getConfiguration());
- } else if (ScanUtil.isIndexRebuild(scan)) { return rebuildIndices(s, region, scan, env.getConfiguration()); }
+ } else if (ScanUtil.isIndexRebuild(scan)) {
+ return rebuildIndices(s, region, scan, env.getConfiguration());
+ }
+
+ PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+ boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
int offsetToBe = 0;
if (localIndexScan) {
/*
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9fa8058c/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index c097d0d..da99ff5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -74,20 +74,13 @@ import static org.apache.phoenix.util.EncodedColumnsUtil.getMinMaxQualifiersFrom
public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
- private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- private KeyValueSchema kvSchema = null;
- private ValueBitSet kvSchemaBitSet;
-
- public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env, boolean useNewValueColumnQualifier,
- PTable.QualifierEncodingScheme encodingScheme) {
+ public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env) {
this.env = env;
- this.useNewValueColumnQualifier = useNewValueColumnQualifier;
- this.encodingScheme = encodingScheme;
}
@Override
public RegionScanner getRegionScanner(final Scan scan, final RegionScanner s) throws Throwable {
-
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
int offset = 0;
if (ScanUtil.isLocalIndex(scan)) {
/*
@@ -106,9 +99,17 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
scanOffset = (Integer)PInteger.INSTANCE.toObject(scanOffsetBytes);
}
RegionScanner innerScanner = s;
+ PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+ boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
- Expression[] arrayFuncRefs = deserializeArrayPostionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs);
+ Expression[] arrayFuncRefs = deserializeArrayPositionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs);
+ KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
+ for (Expression expression : arrayFuncRefs) {
+ builder.addField(expression);
+ }
+ KeyValueSchema kvSchema = builder.build();
+ ValueBitSet kvSchemaBitSet = ValueBitSet.newInstance(kvSchema);
TupleProjector tupleProjector = null;
Region dataRegion = null;
IndexMaintainer indexMaintainer = null;
@@ -196,13 +197,12 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
}
}
- private Expression[] deserializeArrayPostionalExpressionInfoFromScan(Scan scan, RegionScanner s,
- Set<KeyValueColumnExpression> arrayKVRefs) {
+ private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s,
+ Set<KeyValueColumnExpression> arrayKVRefs) {
byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX);
if (specificArrayIdx == null) {
return null;
}
- KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx);
try {
DataInputStream input = new DataInputStream(stream);
@@ -220,10 +220,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction();
arrayIdxFunc.readFields(input);
arrayFuncRefs[i] = arrayIdxFunc;
- builder.addField(arrayIdxFunc);
}
- kvSchema = builder.build();
- kvSchemaBitSet = ValueBitSet.newInstance(kvSchema);
return arrayFuncRefs;
} catch (IOException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9fa8058c/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 3dcbef9..aed5805 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.tuple.*;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -54,8 +55,6 @@ import java.util.Set;
public abstract class RegionScannerFactory {
protected RegionCoprocessorEnvironment env;
- protected boolean useNewValueColumnQualifier;
- protected PTable.QualifierEncodingScheme encodingScheme;
/**
* Returns the region based on the value of the
@@ -107,6 +106,7 @@ public abstract class RegionScannerFactory {
private boolean hasReferences = checkForReferenceFiles();
private HRegionInfo regionInfo = env.getRegionInfo();
private byte[] actualStartKey = getActualStartKey();
+ private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
// If there are any reference files after local index region merge some cases we might
// get the records less than scan start row key. This will happen when we replace the
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9fa8058c/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java
index 68592ef..9e2a08b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java
@@ -54,15 +54,11 @@ public class SnapshotScanner extends AbstractClientScanner {
values = new ArrayList<>();
this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);
- // process the region scanner for non-aggregate queries
- PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
- boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
-
RegionCoprocessorEnvironment snapshotEnv = getSnapshotContextEnvironment(conf);
RegionScannerFactory regionScannerFactory;
if (scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null) {
- regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv, useNewValueColumnQualifier, encodingScheme);
+ regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv);
} else {
/* future work : Snapshot M/R jobs for aggregate queries*/
throw new UnsupportedOperationException("Snapshot M/R jobs not available for aggregate queries");