You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/11/14 01:59:13 UTC
[phoenix] branch master updated: PHOENIX-5998 Paged server side
ungrouped aggregate operations
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 881dd8d PHOENIX-5998 Paged server side ungrouped aggregate operations
881dd8d is described below
commit 881dd8d6033def07d0371cebbaa5635031594277
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Wed Nov 11 16:32:24 2020 -0800
PHOENIX-5998 Paged server side ungrouped aggregate operations
---
.../apache/phoenix/end2end/SpillableGroupByIT.java | 1 +
.../org/apache/phoenix/end2end/UpsertSelectIT.java | 1 -
.../coprocessor/BaseScannerRegionObserver.java | 6 +-
.../UngroupedAggregateRegionObserver.java | 719 ++++-----------------
.../UngroupedAggregateRegionScanner.java | 670 +++++++++++++++++++
.../phoenix/iterate/TableResultIterator.java | 4 +
.../UngroupedAggregatingResultIterator.java | 43 +-
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../apache/phoenix/query/QueryServicesOptions.java | 3 +-
.../java/org/apache/phoenix/query/BaseTest.java | 9 +-
10 files changed, 838 insertions(+), 620 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
index d7e1b37..c6692f8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
@@ -80,6 +80,7 @@ public class SpillableGroupByIT extends BaseOwnClusterIT {
props.put(QueryServices.STATS_COLLECTION_ENABLED, Boolean.toString(false));
props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString());
props.put(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString());
+ props.put(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, Long.toString(1000));
// Must update config before starting server
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index c55e9b7..007d1ef 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -1680,7 +1680,6 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
fail();
} catch (SQLException e) {
assertEquals(SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY.getErrorCode(), e.getErrorCode());
- assertFalse(e.getMessage().contains(invalidValue));
assertTrue(e.getMessage().contains(columnTypeInfo));
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 712f719..423d0d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -74,9 +74,13 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO
public static final String GROUP_BY_LIMIT = "_GroupByLimit";
public static final String LOCAL_INDEX = "_LocalIndex";
public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
- // The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
+ // The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGE_ROWS = "_IndexRebuildPageRows";
+ public static final String SERVER_PAGING = "_ServerPaging";
+ // The number of rows to be scanned in one RPC call
+ public static final String AGGREGATE_PAGE_SIZE_IN_MS = "_AggregatePageSizeInMs";
+
// Index verification type done by the index tool
public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
public static final String INDEX_RETRY_VERIFY = "_IndexRetryVerify";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 2d58c69..c8fd915 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -21,9 +21,6 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
-import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
-import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT;
@@ -35,7 +32,6 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -43,12 +39,8 @@ import java.util.concurrent.Callable;
import javax.annotation.concurrent.GuardedBy;
-import org.apache.phoenix.index.GlobalIndexChecker;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellBuilderFactory;
-import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -56,11 +48,7 @@ import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@@ -86,27 +74,16 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
-import org.apache.phoenix.exception.DataExceedsCapacityException;
import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
-import org.apache.phoenix.expression.aggregator.Aggregator;
-import org.apache.phoenix.expression.aggregator.Aggregators;
-import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter;
-import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.exception.IndexWriteException;
-import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -116,47 +93,26 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.RowKeySchema;
-import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.NoOpStatisticsCollector;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
import org.apache.phoenix.schema.stats.StatsCollectionDisabledOnServerException;
-import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
-import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
-import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.schema.types.PBinary;
-import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PDouble;
-import org.apache.phoenix.schema.types.PFloat;
import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionProvider;
-import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
-import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -165,15 +121,11 @@ import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.ServerUtil.ConnectionType;
-import org.apache.phoenix.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.phoenix.thirdparty.com.google.common.base.Throwables;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
-import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints;
-
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
/**
* Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY).
@@ -220,7 +172,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
@GuardedBy("lock")
private boolean isRegionClosingOrSplitting = false;
private static final Logger LOGGER = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
- private KeyValueBuilder kvBuilder;
private Configuration upsertSelectConfig;
private Configuration compactionConfig;
private Configuration indexWriteConfig;
@@ -233,9 +184,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
@Override
public void start(CoprocessorEnvironment e) throws IOException {
- // Can't use ClientKeyValueBuilder on server-side because the memstore expects to
- // be able to get a single backing buffer for a KeyValue.
- this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
/*
* We need to create a copy of region's configuration since we don't want any side effect of
* setting the RpcControllerFactory.
@@ -248,19 +196,45 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
* priority handlers which could result in a deadlock.
*/
upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
- InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
+ InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
compactionConfig = ServerUtil.getCompactionConfig(e.getConfiguration());
// For retries of index write failures, use the same # of retries as the rebuilder
indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
indexWriteConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
- QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER));
+ e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
+ QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER));
indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
}
- public void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
+ Configuration getUpsertSelectConfig() {
+ return upsertSelectConfig;
+ }
+
+ void incrementScansReferenceCount() throws IOException {
+ synchronized (lock) {
+ if (isRegionClosingOrSplitting) {
+ throw new IOException("Temporarily unable to write from scan because region is closing or splitting");
+ }
+ scansReferenceCount++;
+ lock.notifyAll();
+ }
+ }
+
+ void decrementScansReferenceCount() {
+ synchronized (lock) {
+ scansReferenceCount--;
+ if (scansReferenceCount < 0) {
+ LOGGER.warn(
+ "Scan reference count went below zero. Something isn't correct. Resetting it back to zero");
+ scansReferenceCount = 0;
+ }
+ lock.notifyAll();
+ }
+ }
+
+ void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
try {
commitBatch(region, localRegionMutations, blockingMemstoreSize);
} catch (IOException e) {
@@ -278,10 +252,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
- if (mutations.isEmpty()) {
- return;
- }
+ void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
+ if (mutations.isEmpty()) {
+ return;
+ }
Mutation[] mutationArray = new Mutation[mutations.size()];
// When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
@@ -300,9 +274,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
region.batchMutate(mutations.toArray(mutationArray));
}
- private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID,
- byte[] indexMaintainersPtr, byte[] txState,
- byte[] clientVersionBytes, boolean useIndexProto) {
+ static void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID,
+ byte[] indexMaintainersPtr, byte[] txState,
+ byte[] clientVersionBytes, boolean useIndexProto) {
for (Mutation m : mutations) {
if (indexMaintainersPtr != null) {
m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
@@ -320,9 +294,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
private void commitBatchWithTable(Table table, List<Mutation> mutations) throws IOException {
- if (mutations.isEmpty()) {
- return;
- }
+ if (mutations.isEmpty()) {
+ return;
+ }
LOGGER.debug("Committing batch of " + mutations.size() + " mutations for " + table);
try {
@@ -340,9 +314,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
* a high chance that flush might not proceed and memstore won't be freed up.
* @throws IOException
*/
- public void checkForRegionClosingOrSplitting() throws IOException {
+ void checkForRegionClosingOrSplitting() throws IOException {
synchronized (lock) {
- if(isRegionClosingOrSplitting) {
+ if (isRegionClosingOrSplitting) {
lock.notifyAll();
throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock.");
}
@@ -369,14 +343,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public static class MutationList extends ArrayList<Mutation> {
private long byteSize = 0L;
+
public MutationList() {
super();
}
-
- public MutationList(int size){
+
+ public MutationList(int size) {
super(size);
}
-
+
@Override
public boolean add(Mutation e) {
boolean r = super.add(e);
@@ -392,24 +367,25 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
@Override
public void clear() {
- byteSize = 0L;
+ byteSize = 0l;
super.clear();
}
}
- public static long getBlockingMemstoreSize(Region region, Configuration conf) {
- long flushSize = region.getTableDescriptor().getMemStoreFlushSize();
-
- if (flushSize <= 0) {
- flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
- TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
- }
- return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
- HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1);
- }
-
+ static long getBlockingMemstoreSize(Region region, Configuration conf) {
+ long flushSize = region.getTableDescriptor().getMemStoreFlushSize();
+
+ if (flushSize <= 0) {
+ flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+ TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
+ }
+ return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+ HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER) - 1);
+ }
+
@Override
- protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException {
+ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
+ final RegionScanner s) throws IOException, SQLException {
final RegionCoprocessorEnvironment env = c.getEnvironment();
final Region region = env.getRegion();
long ts = scan.getTimeRange().getMax();
@@ -436,39 +412,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
});
}
-
- PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
int offsetToBe = 0;
if (localIndexScan) {
- /*
- * For local indexes, we need to set an offset on row key expressions to skip
- * the region start key.
- */
offsetToBe = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length :
- region.getRegionInfo().getEndKey().length;
- ScanUtil.setRowKeyOffset(scan, offsetToBe);
+ region.getRegionInfo().getEndKey().length;
}
final int offset = offsetToBe;
-
- PTable projectedTable = null;
- PTable writeToTable = null;
- byte[][] values = null;
byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY);
boolean isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null;
- if (isDescRowKeyOrderUpgrade) {
- LOGGER.debug("Upgrading row key for " + region.getRegionInfo().getTable().getNameAsString());
- projectedTable = deserializeTable(descRowKeyTableBytes);
- try {
- writeToTable = PTableImpl.builderWithColumns(projectedTable,
- getColumnsToClone(projectedTable))
- .setRowKeyOrderOptimizable(true)
- .build();
- } catch (SQLException e) {
- ServerUtil.throwIOException("Upgrade failed", e); // Impossible
- }
- values = new byte[projectedTable.getPKColumns().size()][];
- }
boolean useProto = false;
byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
useProto = localIndexBytes != null;
@@ -476,47 +428,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
}
List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
- MutationList indexMutations = localIndexBytes == null ? new MutationList() : new MutationList(1024);
-
RegionScanner theScanner = s;
-
- byte[] replayMutations = scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES);
- byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
- byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
- byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
- PhoenixTransactionProvider txnProvider = null;
- if (txState != null) {
- int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
- txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion);
- }
- List<Expression> selectExpressions = null;
byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
- boolean isUpsert = false;
boolean isDelete = false;
- byte[] deleteCQ = null;
- byte[] deleteCF = null;
- byte[] emptyCF = null;
- Connection targetHConn = null;
- Table targetHTable = null;
- boolean isPKChanging = false;
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- if (upsertSelectTable != null) {
- isUpsert = true;
- projectedTable = deserializeTable(upsertSelectTable);
- targetHConn = ConnectionFactory.createConnection(upsertSelectConfig);
- targetHTable = targetHConn.getTable(
- TableName.valueOf(projectedTable.getPhysicalName().getBytes()));
- selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
- values = new byte[projectedTable.getPKColumns().size()][];
- isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
- } else {
+ if (upsertSelectTable == null) {
byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
- if (!isDelete) {
- deleteCF = scan.getAttribute(BaseScannerRegionObserver.DELETE_CF);
- deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
- }
- emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
}
TupleProjector tupleProjector = null;
byte[][] viewConstants = null;
@@ -531,422 +448,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
theScanner =
- getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector,
- region, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
- }
-
- if (j != null) {
- theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier);
+ getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector,
+ region, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
}
-
- int maxBatchSize = 0;
- long maxBatchSizeBytes = 0L;
- MutationList mutations = new MutationList();
- boolean needToWrite = false;
- Configuration conf = env.getConfiguration();
-
- /**
- * Slow down the writes if the memstore size more than
- * (hbase.hregion.memstore.block.multiplier - 1) times hbase.hregion.memstore.flush.size
- * bytes. This avoids flush storm to hdfs for cases like index building where reads and
- * write happen to all the table regions in the server.
- */
- final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ;
- boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
- if(buildLocalIndex) {
- checkForLocalIndexColumnFamilies(region, indexMaintainers);
- }
- if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
- || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
- needToWrite = true;
- if((isUpsert && (targetHTable == null ||
- !targetHTable.getName().equals(region.getTableDescriptor().getTableName())))) {
- needToWrite = false;
- }
- maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
- mutations = new MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10));
- maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
- }
- boolean hasMore;
- int rowCount = 0;
- boolean hasAny = false;
- boolean acquiredLock = false;
- boolean incrScanRefCount = false;
- Aggregators aggregators = null;
- Aggregator[] rowAggregators = null;
- final RegionScanner innerScanner = theScanner;
- final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan));
- try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
- aggregators = ServerAggregators.deserialize(
- scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), conf, em);
- rowAggregators = aggregators.getAggregators();
- Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
- Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan)));
- }
- boolean useIndexProto = true;
- byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
- // for backward compatiblity fall back to look by the old attribute
- if (indexMaintainersPtr == null) {
- indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
- useIndexProto = false;
- }
-
- if(needToWrite) {
- synchronized (lock) {
- if (isRegionClosingOrSplitting) {
- throw new IOException("Temporarily unable to write from scan because region is closing or splitting");
- }
- scansReferenceCount++;
- incrScanRefCount = true;
- lock.notifyAll();
- }
- }
- region.startRegionOperation();
- acquiredLock = true;
- synchronized (innerScanner) {
- do {
- List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
- // Results are potentially returned even when the return value of s.next is false
- // since this is an indication of whether or not there are more values after the
- // ones returned
- hasMore = innerScanner.nextRaw(results);
- if (!results.isEmpty()) {
- rowCount++;
- result.setKeyValues(results);
- if (isDescRowKeyOrderUpgrade) {
- Arrays.fill(values, null);
- Cell firstKV = results.get(0);
- RowKeySchema schema = projectedTable.getRowKeySchema();
- int maxOffset = schema.iterator(firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr);
- for (int i = 0; i < schema.getFieldCount(); i++) {
- Boolean hasValue = schema.next(ptr, i, maxOffset);
- if (hasValue == null) {
- break;
- }
- Field field = schema.getField(i);
- if (field.getSortOrder() == SortOrder.DESC) {
- // Special case for re-writing DESC ARRAY, as the actual byte value needs to change in this case
- if (field.getDataType().isArrayType()) {
- field.getDataType().coerceBytes(ptr, null, field.getDataType(),
- field.getMaxLength(), field.getScale(), field.getSortOrder(),
- field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte
- }
- // Special case for re-writing DESC CHAR or DESC BINARY, to force the re-writing of trailing space characters
- else if (field.getDataType() == PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) {
- int len = ptr.getLength();
- while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
- len--;
- }
- ptr.set(ptr.get(), ptr.getOffset(), len);
- // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171)
- } else if (field.getDataType() == PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) {
- byte[] invertedBytes = SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength());
- ptr.set(invertedBytes);
- }
- } else if (field.getDataType() == PBinary.INSTANCE) {
- // Remove trailing space characters so that the setValues call below will replace them
- // with the correct zero byte character. Note this is somewhat dangerous as these
- // could be legit, but I don't know what the alternative is.
- int len = ptr.getLength();
- while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
- len--;
- }
- ptr.set(ptr.get(), ptr.getOffset(), len);
- }
- values[i] = ptr.copyBytes();
- }
- writeToTable.newKey(ptr, values);
- if (Bytes.compareTo(
- firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(),
- ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) {
- continue;
- }
- byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr);
- if (offset > 0) { // for local indexes (prepend region start key)
- byte[] newRowWithOffset = new byte[offset + newRow.length];
- System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), newRowWithOffset, 0, offset);
- System.arraycopy(newRow, 0, newRowWithOffset, offset, newRow.length);
- newRow = newRowWithOffset;
- }
- byte[] oldRow = Bytes.copy(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength());
- for (Cell cell : results) {
- // Copy existing cell but with new row key
- Cell newCell =
- CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
- setRow(newRow).
- setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()).
- setQualifier(cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength()).
- setTimestamp(cell.getTimestamp()).
- setType(cell.getType()).setValue(cell.getValueArray(),
- cell.getValueOffset(), cell.getValueLength()).build();
- switch (cell.getType()) {
- case Put:
- // If Put, point delete old Put
- Delete del = new Delete(oldRow);
- Cell newDelCell =
- CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
- setRow(newRow).
- setFamily(cell.getFamilyArray(), cell.getFamilyOffset(),
- cell.getFamilyLength()).
- setQualifier(cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength()).
- setTimestamp(cell.getTimestamp()).
- setType(Cell.Type.Delete).
- setValue(ByteUtil.EMPTY_BYTE_ARRAY,
- 0, 0).build();
- del.add(newDelCell);
- mutations.add(del);
-
- Put put = new Put(newRow);
- put.add(newCell);
- mutations.add(put);
- break;
- case Delete:
- case DeleteColumn:
- case DeleteFamily:
- case DeleteFamilyVersion:
- Delete delete = new Delete(newRow);
- delete.add(newCell);
- mutations.add(delete);
- break;
- }
- }
- } else if (buildLocalIndex) {
- for (IndexMaintainer maintainer : indexMaintainers) {
- if (!results.isEmpty()) {
- result.getKey(ptr);
- ValueGetter valueGetter =
- maintainer.createGetterFromKeyValues(
- ImmutableBytesPtr.copyBytesIfNecessary(ptr),
- results);
- Put put = maintainer.buildUpdateMutation(kvBuilder,
- valueGetter, ptr, results.get(0).getTimestamp(),
- env.getRegion().getRegionInfo().getStartKey(),
- env.getRegion().getRegionInfo().getEndKey());
-
- if (txnProvider != null) {
- put = txnProvider.markPutAsCommitted(put, ts, ts);
- }
- indexMutations.add(put);
- }
- }
- result.setKeyValues(results);
- } else if (isDelete) {
- // FIXME: the version of the Delete constructor without the lock
- // args was introduced in 0.94.4, thus if we try to use it here
- // we can no longer use the 0.94.2 version of the client.
- Cell firstKV = results.get(0);
- Delete delete = new Delete(firstKV.getRowArray(),
- firstKV.getRowOffset(), firstKV.getRowLength(),ts);
- if (replayMutations != null) {
- delete.setAttribute(REPLAY_WRITES, replayMutations);
- }
- mutations.add(delete);
- // force tephra to ignore this deletes
- delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
- } else if (isUpsert) {
- Arrays.fill(values, null);
- int bucketNumOffset = 0;
- if (projectedTable.getBucketNum() != null) {
- values[0] = new byte[] { 0 };
- bucketNumOffset = 1;
- }
- int i = bucketNumOffset;
- List<PColumn> projectedColumns = projectedTable.getColumns();
- for (; i < projectedTable.getPKColumns().size(); i++) {
- Expression expression = selectExpressions.get(i - bucketNumOffset);
- if (expression.evaluate(result, ptr)) {
- values[i] = ptr.copyBytes();
- // If SortOrder from expression in SELECT doesn't match the
- // column being projected into then invert the bits.
- if (expression.getSortOrder() !=
- projectedColumns.get(i).getSortOrder()) {
- SortOrder.invert(values[i], 0, values[i], 0,
- values[i].length);
- }
- }else{
- values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
- }
- }
- projectedTable.newKey(ptr, values);
- PRow row = projectedTable.newRow(kvBuilder, ts, ptr, false);
- for (; i < projectedColumns.size(); i++) {
- Expression expression = selectExpressions.get(i - bucketNumOffset);
- if (expression.evaluate(result, ptr)) {
- PColumn column = projectedColumns.get(i);
- if (!column.getDataType().isSizeCompatible(ptr, null,
- expression.getDataType(), expression.getSortOrder(),
- expression.getMaxLength(), expression.getScale(),
- column.getMaxLength(), column.getScale())) {
- throw new DataExceedsCapacityException(
- column.getDataType(),
- column.getMaxLength(),
- column.getScale(),
- column.getName().getString());
- }
- column.getDataType().coerceBytes(ptr, null,
- expression.getDataType(), expression.getMaxLength(),
- expression.getScale(), expression.getSortOrder(),
- column.getMaxLength(), column.getScale(),
- column.getSortOrder(), projectedTable.rowKeyOrderOptimizable());
- byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
- row.setValue(column, bytes);
- }
- }
- for (Mutation mutation : row.toRowMutations()) {
- if (replayMutations != null) {
- mutation.setAttribute(REPLAY_WRITES, replayMutations);
- } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) {
- mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts);
- }
- mutations.add(mutation);
- }
- for (i = 0; i < selectExpressions.size(); i++) {
- selectExpressions.get(i).reset();
- }
- } else if (deleteCF != null && deleteCQ != null) {
- // No need to search for delete column, since we project only it
- // if no empty key value is being set
- if (emptyCF == null ||
- result.getValue(deleteCF, deleteCQ) != null) {
- Delete delete = new Delete(results.get(0).getRowArray(),
- results.get(0).getRowOffset(),
- results.get(0).getRowLength());
- delete.addColumns(deleteCF, deleteCQ, ts);
- // force tephra to ignore this deletes
- delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
- mutations.add(delete);
- }
- }
- if (emptyCF != null) {
- /*
- * If we've specified an emptyCF, then we need to insert an empty
- * key value "retroactively" for any key value that is visible at
- * the timestamp that the DDL was issued. Key values that are not
- * visible at this timestamp will not ever be projected up to
- * scans past this timestamp, so don't need to be considered.
- * We insert one empty key value per row per timestamp.
- */
- Set<Long> timeStamps =
- Sets.newHashSetWithExpectedSize(results.size());
- for (Cell kv : results) {
- long kvts = kv.getTimestamp();
- if (!timeStamps.contains(kvts)) {
- Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
- kv.getRowLength());
- put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
- ByteUtil.EMPTY_BYTE_ARRAY);
- mutations.add(put);
- }
- }
- }
- if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
- txState, targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
- mutations.clear();
- }
- // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
-
- if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
- commitBatch(region, indexMutations, blockingMemStoreSize);
- indexMutations.clear();
- }
- aggregators.aggregate(rowAggregators, result);
- hasAny = true;
- }
- } while (hasMore);
- if (!mutations.isEmpty()) {
- commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
- targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
- mutations.clear();
- }
-
- if (!indexMutations.isEmpty()) {
- commitBatch(region, indexMutations, blockingMemStoreSize);
- indexMutations.clear();
- }
- }
- } finally {
- if (needToWrite && incrScanRefCount) {
- synchronized (lock) {
- scansReferenceCount--;
- if (scansReferenceCount < 0) {
- LOGGER.warn(
- "Scan reference count went below zero. Something isn't correct. Resetting it back to zero");
- scansReferenceCount = 0;
- }
- lock.notifyAll();
- }
- }
- try {
- if (targetHTable != null) {
- try {
- targetHTable.close();
- } catch (IOException e) {
- LOGGER.error("Closing table: " + targetHTable + " failed: ", e);
- }
- }
- if (targetHConn != null) {
- try {
- targetHConn.close();
- } catch (IOException e) {
- LOGGER.error("Closing connection: " + targetHConn + " failed: ", e);
- }
- }
- } finally {
- try {
- innerScanner.close();
- } finally {
- if (acquiredLock) region.closeRegionOperation();
- }
- }
- }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan)));
- }
-
- final boolean hadAny = hasAny;
- Cell keyValue = null;
- if (hadAny) {
- byte[] value = aggregators.toBytes(rowAggregators);
- keyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
- SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+ if (j != null) {
+ theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier);
}
- final Cell aggKeyValue = keyValue;
-
- RegionScanner scanner = new BaseRegionScanner(innerScanner) {
- private boolean done = !hadAny;
-
- @Override
- public boolean isFilterDone() {
- return done;
- }
-
- @Override
- public boolean next(List<Cell> results) throws IOException {
- if (done) return false;
- done = true;
- results.add(aggKeyValue);
- return false;
- }
-
- @Override
- public long getMaxResultSize() {
- return scan.getMaxResultSize();
- }
- };
- return scanner;
-
+ return new UngroupedAggregateRegionScanner(c, theScanner, region, scan, env, this);
}
- private void checkForLocalIndexColumnFamilies(Region region,
- List<IndexMaintainer> indexMaintainers) throws IOException {
+ public static void checkForLocalIndexColumnFamilies(Region region,
+ List<IndexMaintainer> indexMaintainers) throws IOException {
TableDescriptor tableDesc = region.getTableDescriptor();
String schemaName =
tableDesc.getTableName().getNamespaceAsString()
@@ -956,13 +469,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
String tableName = SchemaUtil.getTableNameFromFullName(tableDesc.getTableName().getNameAsString());
for (IndexMaintainer indexMaintainer : indexMaintainers) {
Set<ColumnReference> coveredColumns = indexMaintainer.getCoveredColumns();
- if(coveredColumns.isEmpty()) {
+ if (coveredColumns.isEmpty()) {
byte[] localIndexCf = indexMaintainer.getEmptyKeyValueFamily().get();
// When covered columns empty we store index data in default column family so check for it.
if (tableDesc.getColumnFamily(localIndexCf) == null) {
ServerUtil.throwIOException("Column Family Not Found",
- new ColumnFamilyNotFoundException(schemaName, tableName, Bytes
- .toString(localIndexCf)));
+ new ColumnFamilyNotFoundException(schemaName, tableName, Bytes
+ .toString(localIndexCf)));
}
}
for (ColumnReference reference : coveredColumns) {
@@ -970,21 +483,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
ColumnFamilyDescriptor family = region.getTableDescriptor().getColumnFamily(cf);
if (family == null) {
ServerUtil.throwIOException("Column Family Not Found",
- new ColumnFamilyNotFoundException(schemaName, tableName, Bytes.toString(cf)));
+ new ColumnFamilyNotFoundException(schemaName, tableName, Bytes.toString(cf)));
}
}
}
}
- private void commit(final Region region, List<Mutation> mutations, byte[] indexUUID, final long blockingMemStoreSize,
- byte[] indexMaintainersPtr, byte[] txState, final Table targetHTable, boolean useIndexProto,
- boolean isPKChanging, byte[] clientVersionBytes)
+ void commit(final Region region, List<Mutation> mutations, byte[] indexUUID, final long blockingMemStoreSize,
+ byte[] indexMaintainersPtr, byte[] txState, final Table targetHTable, boolean useIndexProto,
+ boolean isPKChanging, byte[] clientVersionBytes)
throws IOException {
final List<Mutation> localRegionMutations = Lists.newArrayList();
final List<Mutation> remoteRegionMutations = Lists.newArrayList();
setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations,
- isPKChanging);
+ isPKChanging);
commitBatchWithRetries(region, localRegionMutations, blockingMemStoreSize);
try {
commitBatchWithTable(targetHTable, remoteRegionMutations);
@@ -1006,7 +519,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
private void handleIndexWriteException(final List<Mutation> localRegionMutations, IOException origIOE,
- MutateCommand mutateCommand) throws IOException {
+ MutateCommand mutateCommand) throws IOException {
long serverTimestamp = ServerUtil.parseTimestampFromRemoteException(origIOE);
SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(origIOE);
if (inferredE != null && inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
@@ -1015,20 +528,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
for (Mutation mutation : localRegionMutations) {
if (PhoenixIndexMetaData.isIndexRebuild(mutation.getAttributesMap())) {
mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
- BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+ BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
} else {
mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
- BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
+ BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
}
// use the server timestamp for index write retrys
PhoenixKeyValueUtil.setTimestamp(mutation, serverTimestamp);
}
IndexWriteException iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE);
try (PhoenixConnection conn =
- QueryUtil.getConnectionOnServer(indexWriteConfig)
- .unwrap(PhoenixConnection.class)) {
+ QueryUtil.getConnectionOnServer(indexWriteConfig)
+ .unwrap(PhoenixConnection.class)) {
PhoenixIndexFailurePolicy.doBatchWithRetries(mutateCommand, iwe, conn,
- indexWriteProps);
+ indexWriteProps);
} catch (Exception e) {
throw new DoNotRetryIOException(e);
}
@@ -1039,14 +552,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private void separateLocalAndRemoteMutations(Table targetHTable, Region region, List<Mutation> mutations,
List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations,
- boolean isPKChanging){
+ boolean isPKChanging) {
boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region);
//if we're writing to the same table, but the PK can change, that means that some
//mutations might be in our current region, and others in a different one.
if (areMutationsInSameTable && isPKChanging) {
RegionInfo regionInfo = region.getRegionInfo();
- for (Mutation mutation : mutations){
- if (regionInfo.containsRow(mutation.getRow())){
+ for (Mutation mutation : mutations) {
+ if (regionInfo.containsRow(mutation.getRow())) {
localRegionMutations.add(mutation);
} else {
remoteRegionMutations.add(mutation);
@@ -1066,9 +579,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
- InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
- CompactionRequest request)
- throws IOException {
+ InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {
if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
final TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
// Compaction and split upcalls run with the effective user context of the requesting user.
@@ -1076,7 +588,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// the login user. Switch to the login user context to ensure we have the expected
// security context.
return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
- @Override public InternalScanner run() throws Exception {
+ @Override
+ public InternalScanner run() throws Exception {
InternalScanner internalScanner = scanner;
try {
long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
@@ -1084,8 +597,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
new DelegateRegionCoprocessorEnvironment(c.getEnvironment(),
ConnectionType.COMPACTION_CONNECTION);
StatisticsCollector statisticsCollector = StatisticsCollectorFactory.createStatisticsCollector(
- compactionConfEnv, table.getNameAsString(), clientTimeStamp,
- store.getColumnFamilyDescriptor().getName());
+ compactionConfEnv, table.getNameAsString(), clientTimeStamp,
+ store.getColumnFamilyDescriptor().getName());
statisticsCollector.init();
internalScanner = statisticsCollector.createCompactionScanner(compactionConfEnv, store, internalScanner);
} catch (Exception e) {
@@ -1102,7 +615,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
return scanner;
}
- private static PTable deserializeTable(byte[] b) {
+ static PTable deserializeTable(byte[] b) {
try {
PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
return PTableImpl.createFromProto(ptableProto);
@@ -1119,19 +632,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} else {
if (region.getTableDescriptor().hasCoprocessor(GlobalIndexChecker.class.getCanonicalName())) {
return new IndexRepairRegionScanner(innerScanner, region, scan, env, this);
- } else {
+ } else {
return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
}
}
-
}
+
private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env) throws IOException {
boolean oldCoproc = region.getTableDescriptor().hasCoprocessor(Indexer.class.getCanonicalName());
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
IndexTool.IndexVerifyType verifyType = (valueBytes != null) ?
IndexTool.IndexVerifyType.fromValue(valueBytes) : IndexTool.IndexVerifyType.NONE;
- if (oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) {
+ if (oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) {
return new IndexerRegionScanner(innerScanner, region, scan, env, this);
}
if (!scan.isRaw()) {
@@ -1158,9 +671,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
return getRegionScanner(innerScanner, region, scan, env, oldCoproc);
}
-
+
private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
- final Region region, final Scan scan, Configuration config) throws IOException {
+ final Region region, final Scan scan, Configuration config) throws IOException {
StatsCollectionCallable callable =
new StatsCollectionCallable(stats, region, innerScanner, config, scan);
byte[] asyncBytes = scan.getAttribute(BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB);
@@ -1171,7 +684,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
long rowCount = 0; // in case of async, we report 0 as number of rows updated
StatisticsCollectionRunTracker statsRunTracker =
StatisticsCollectionRunTracker.getInstance(config);
- final boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet());
+ final boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(), scan.getFamilyMap().keySet());
if (runUpdateStats) {
if (!async) {
rowCount = callable.call();
@@ -1186,7 +699,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
final Cell aggKeyValue =
PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
- SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+ SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
RegionScanner scanner = new BaseRegionScanner(innerScanner) {
@Override
public RegionInfo getRegionInfo() {
@@ -1236,7 +749,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private final Scan scan;
StatsCollectionCallable(StatisticsCollector s, Region r, RegionScanner rs,
- Configuration config, Scan scan) {
+ Configuration config, Scan scan) {
this.statsCollector = s;
this.region = r;
this.innerScanner = rs;
@@ -1307,7 +820,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- private static List<Expression> deserializeExpressions(byte[] b) {
+ static List<Expression> deserializeExpressions(byte[] b) {
ByteArrayInputStream stream = new ByteArrayInputStream(b);
try {
DataInputStream input = new DataInputStream(stream);
@@ -1362,7 +875,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
private void waitForScansToFinish(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
int maxWaitTime = c.getEnvironment().getConfiguration().getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
long start = EnvironmentEdgeManager.currentTimeMillis();
synchronized (lock) {
isRegionClosingOrSplitting = true;
@@ -1372,10 +885,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
if (EnvironmentEdgeManager.currentTimeMillis() - start >= maxWaitTime) {
isRegionClosingOrSplitting = false; // must reset in case split is not retried
throw new IOException(String.format(
- "Operations like local index building/delete/upsert select"
- + " might be going on so not allowing to split/close. scansReferenceCount=%s region=%s",
- scansReferenceCount,
- c.getEnvironment().getRegionInfo().getRegionNameAsString()));
+ "Operations like local index building/delete/upsert select"
+ + " might be going on so not allowing to split/close. scansReferenceCount=%s region=%s",
+ scansReferenceCount,
+ c.getEnvironment().getRegionInfo().getRegionNameAsString()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -1386,7 +899,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
@Override
public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> c,
- List<Pair<byte[], String>> familyPaths) throws IOException {
+ List<Pair<byte[], String>> familyPaths) throws IOException {
// Don't allow bulkload if operations need read and write to same region are going on in the
// the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
synchronized (lock) {
@@ -1416,6 +929,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// This will lead to failure of cross cluster RPC if the effective user is not
// the login user. Switch to the login user context to ensure we have the expected
// security context.
+
final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
// since we will make a call to syscat, do nothing if we are compacting syscat itself
if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
@@ -1424,15 +938,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public Void run() throws Exception {
// If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index
try (PhoenixConnection conn =
- QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
+ QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
List<PTable> indexes = PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : table.getIndexes();
// FIXME need to handle views and indexes on views as well
for (PTable index : indexes) {
if (index.getIndexDisableTimestamp() != 0) {
LOGGER.info(
- "Modifying major compaction scanner to retain deleted cells for a table with disabled index: "
- + fullTableName);
+ "Modifying major compaction scanner to retain deleted cells for a table with disabled index: "
+ + fullTableName);
options.setKeepDeletedCells(KeepDeletedCells.TRUE);
options.readAllVersions();
options.setTTL(Long.MAX_VALUE);
@@ -1444,8 +958,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// non-Phoenix HBase tables won't be found, do nothing
} else {
LOGGER.error("Unable to modify compaction scanner to retain deleted cells for a table with disabled Index; "
- + fullTableName,
- e);
+ + fullTableName, e);
}
}
return null;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
new file mode 100644
index 0000000..844273a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.REPLAY_WRITES;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY;
+import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.checkForLocalIndexColumnFamilies;
+import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeExpressions;
+import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeTable;
+import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.getBlockingMemstoreSize;
+import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.setIndexAndTransactionProperties;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
+import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.memory.InsufficientMemoryException;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueSchema;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBinary;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PFloat;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ExpressionUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UngroupedAggregateRegionScanner.class);
+
+ private long pageSizeInMs = Long.MAX_VALUE;
+ private int maxBatchSize = 0;
+ private Scan scan;
+ private RegionScanner innerScanner;
+ private Region region;
+ private final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
+ private final RegionCoprocessorEnvironment env;
+ private final boolean useQualifierAsIndex;
+ private boolean needToWrite = false;
+ private final Pair<Integer, Integer> minMaxQualifiers;
+ private byte[][] values = null;
+ private PTable.QualifierEncodingScheme encodingScheme;
+ private PTable writeToTable = null;
+ private PTable projectedTable = null;
+ private boolean isDescRowKeyOrderUpgrade;
+ private final int offset;
+ private boolean buildLocalIndex;
+ private List<IndexMaintainer> indexMaintainers;
+ private boolean isPKChanging = false;
+ private long ts;
+ private PhoenixTransactionProvider txnProvider = null;
+ private UngroupedAggregateRegionObserver.MutationList indexMutations;
+ private boolean isDelete = false;
+ private byte[] replayMutations;
+ private boolean isUpsert = false;
+ private List<Expression> selectExpressions = null;
+ private byte[] deleteCQ = null;
+ private byte[] deleteCF = null;
+ private byte[] emptyCF = null;
+ private byte[] indexUUID;
+ private byte[] txState;
+ private byte[] clientVersionBytes;
+ private long blockingMemStoreSize;
+ private long maxBatchSizeBytes = 0L;
+ private Table targetHTable = null;
+ private boolean incrScanRefCount = false;
+ private byte[] indexMaintainersPtr;
+ private boolean useIndexProto;
+ private Connection targetHConn = null;
+
+ public UngroupedAggregateRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final RegionScanner innerScanner, final Region region, final Scan scan,
+ final RegionCoprocessorEnvironment env,
+ final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+ throws IOException, SQLException{
+ super(innerScanner);
+ this.env = env;
+ this.region = region;
+ this.scan = scan;
+ this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
+ this.innerScanner = innerScanner;
+ Configuration conf = env.getConfiguration();
+ if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) {
+ byte[] pageSizeFromScan =
+ scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS);
+ if (pageSizeFromScan != null) {
+ pageSizeInMs = Bytes.toLong(pageSizeFromScan);
+ } else {
+ pageSizeInMs =
+ conf.getLong(UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS,
+ QueryServicesOptions.DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS);
+ }
+ }
+ ts = scan.getTimeRange().getMax();
+ boolean localIndexScan = ScanUtil.isLocalIndex(scan);
+ encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+ int offsetToBe = 0;
+ if (localIndexScan) {
+ /*
+ * For local indexes, we need to set an offset on row key expressions to skip
+ * the region start key.
+ */
+ offsetToBe = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length :
+ region.getRegionInfo().getEndKey().length;
+ ScanUtil.setRowKeyOffset(scan, offsetToBe);
+ }
+ offset = offsetToBe;
+
+ byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY);
+ isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null;
+ if (isDescRowKeyOrderUpgrade) {
+ LOGGER.debug("Upgrading row key for " + region.getRegionInfo().getTable().getNameAsString());
+ projectedTable = deserializeTable(descRowKeyTableBytes);
+ try {
+ writeToTable = PTableImpl.builderWithColumns(projectedTable,
+ getColumnsToClone(projectedTable))
+ .setRowKeyOrderOptimizable(true)
+ .build();
+ } catch (SQLException e) {
+ ServerUtil.throwIOException("Upgrade failed", e); // Impossible
+ }
+ values = new byte[projectedTable.getPKColumns().size()][];
+ }
+ boolean useProto = false;
+ byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+ useProto = localIndexBytes != null;
+ if (localIndexBytes == null) {
+ localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+ }
+ indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
+ indexMutations = localIndexBytes == null ? new UngroupedAggregateRegionObserver.MutationList() : new UngroupedAggregateRegionObserver.MutationList(1024);
+
+ replayMutations = scan.getAttribute(REPLAY_WRITES);
+ indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+ txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+ clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+ if (txState != null) {
+ int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+ txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion);
+ }
+ byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
+ if (upsertSelectTable != null) {
+ isUpsert = true;
+ projectedTable = deserializeTable(upsertSelectTable);
+ targetHConn = ConnectionFactory.createConnection(ungroupedAggregateRegionObserver.getUpsertSelectConfig());
+ targetHTable = targetHConn.getTable(
+ TableName.valueOf(projectedTable.getPhysicalName().getBytes()));
+ selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
+ values = new byte[projectedTable.getPKColumns().size()][];
+ isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
+ } else {
+ byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
+ isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+ if (!isDelete) {
+ deleteCF = scan.getAttribute(BaseScannerRegionObserver.DELETE_CF);
+ deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
+ }
+ emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
+ }
+ ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
+ useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
+
+ /**
+ * Slow down the writes if the memstore size more than
+ * (hbase.hregion.memstore.block.multiplier - 1) times hbase.hregion.memstore.flush.size
+ * bytes. This avoids flush storm to hdfs for cases like index building where reads and
+ * write happen to all the table regions in the server.
+ */
+ blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ;
+
+ buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
+ if(buildLocalIndex) {
+ checkForLocalIndexColumnFamilies(region, indexMaintainers);
+ }
+ if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
+ || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
+ needToWrite = true;
+ if((isUpsert && (targetHTable == null ||
+ !targetHTable.getName().equals(region.getTableDescriptor().getTableName())))) {
+ needToWrite = false;
+ }
+ maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+ }
+ minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " " + region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan)));
+ }
+ useIndexProto = true;
+ indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+ // for backward compatiblity fall back to look by the old attribute
+ if (indexMaintainersPtr == null) {
+ indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ useIndexProto = false;
+ }
+
+ if (needToWrite) {
+ ungroupedAggregateRegionObserver.incrementScansReferenceCount();
+ incrScanRefCount = true;
+ }
+ }
+
+ @Override
+ public RegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (needToWrite && incrScanRefCount) {
+ ungroupedAggregateRegionObserver.decrementScansReferenceCount();
+ }
+ try {
+ if (targetHTable != null) {
+ try {
+ targetHTable.close();
+ } catch (IOException e) {
+ LOGGER.error("Closing table: " + targetHTable + " failed: ", e);
+ }
+ }
+ if (targetHConn != null) {
+ try {
+ targetHConn.close();
+ } catch (IOException e) {
+ LOGGER.error("Closing connection: " + targetHConn + " failed: ", e);
+ }
+ }
+ } finally {
+ innerScanner.close();
+ }
+ }
+
+ boolean descRowKeyOrderUpgrade(List<Cell> results, ImmutableBytesWritable ptr,
+ UngroupedAggregateRegionObserver.MutationList mutations) throws IOException {
+ Arrays.fill(values, null);
+ Cell firstKV = results.get(0);
+ RowKeySchema schema = projectedTable.getRowKeySchema();
+ int maxOffset = schema.iterator(firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr);
+ for (int i = 0; i < schema.getFieldCount(); i++) {
+ Boolean hasValue = schema.next(ptr, i, maxOffset);
+ if (hasValue == null) {
+ break;
+ }
+ ValueSchema.Field field = schema.getField(i);
+ if (field.getSortOrder() == SortOrder.DESC) {
+ // Special case for re-writing DESC ARRAY, as the actual byte value needs to change in this case
+ if (field.getDataType().isArrayType()) {
+ field.getDataType().coerceBytes(ptr, null, field.getDataType(),
+ field.getMaxLength(), field.getScale(), field.getSortOrder(),
+ field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte
+ }
+ // Special case for re-writing DESC CHAR or DESC BINARY, to force the re-writing of trailing space characters
+ else if (field.getDataType() == PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) {
+ int len = ptr.getLength();
+ while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
+ len--;
+ }
+ ptr.set(ptr.get(), ptr.getOffset(), len);
+ // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171)
+ } else if (field.getDataType() == PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) {
+ byte[] invertedBytes = SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength());
+ ptr.set(invertedBytes);
+ }
+ } else if (field.getDataType() == PBinary.INSTANCE) {
+ // Remove trailing space characters so that the setValues call below will replace them
+ // with the correct zero byte character. Note this is somewhat dangerous as these
+ // could be legit, but I don't know what the alternative is.
+ int len = ptr.getLength();
+ while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
+ len--;
+ }
+ ptr.set(ptr.get(), ptr.getOffset(), len);
+ }
+ values[i] = ptr.copyBytes();
+ }
+ writeToTable.newKey(ptr, values);
+ if (Bytes.compareTo(
+ firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(),
+ ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) {
+ return false;
+ }
+ byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ if (offset > 0) { // for local indexes (prepend region start key)
+ byte[] newRowWithOffset = new byte[offset + newRow.length];
+ System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), newRowWithOffset, 0, offset);
+ System.arraycopy(newRow, 0, newRowWithOffset, offset, newRow.length);
+ newRow = newRowWithOffset;
+ }
+ byte[] oldRow = Bytes.copy(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength());
+ for (Cell cell : results) {
+ // Copy existing cell but with new row key
+ Cell newCell =
+ CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
+ setRow(newRow).
+ setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()).
+ setQualifier(cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength()).
+ setTimestamp(cell.getTimestamp()).
+ setType(cell.getType()).setValue(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength()).build();
+ switch (cell.getType()) {
+ case Put:
+ // If Put, point delete old Put
+ Delete del = new Delete(oldRow);
+ Cell newDelCell =
+ CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
+ setRow(newRow).
+ setFamily(cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength()).
+ setQualifier(cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength()).
+ setTimestamp(cell.getTimestamp()).
+ setType(Cell.Type.Delete).
+ setValue(ByteUtil.EMPTY_BYTE_ARRAY,
+ 0, 0).build();
+ del.add(newDelCell);
+ mutations.add(del);
+
+ Put put = new Put(newRow);
+ put.add(newCell);
+ mutations.add(put);
+ break;
+ case Delete:
+ case DeleteColumn:
+ case DeleteFamily:
+ case DeleteFamilyVersion:
+ Delete delete = new Delete(newRow);
+ delete.add(newCell);
+ mutations.add(delete);
+ break;
+ }
+ }
+ return true;
+ }
+
+ void buildLocalIndex(Tuple result, List<Cell> results, ImmutableBytesWritable ptr) throws IOException {
+ for (IndexMaintainer maintainer : indexMaintainers) {
+ if (!results.isEmpty()) {
+ result.getKey(ptr);
+ ValueGetter valueGetter =
+ maintainer.createGetterFromKeyValues(
+ ImmutableBytesPtr.copyBytesIfNecessary(ptr),
+ results);
+ Put put = maintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ valueGetter, ptr, results.get(0).getTimestamp(),
+ env.getRegion().getRegionInfo().getStartKey(),
+ env.getRegion().getRegionInfo().getEndKey());
+
+ if (txnProvider != null) {
+ put = txnProvider.markPutAsCommitted(put, ts, ts);
+ }
+ indexMutations.add(put);
+ }
+ }
+ result.setKeyValues(results);
+ }
+ void deleteRow(List<Cell> results, UngroupedAggregateRegionObserver.MutationList mutations) {
+ // FIXME: the version of the Delete constructor without the lock
+ // args was introduced in 0.94.4, thus if we try to use it here
+ // we can no longer use the 0.94.2 version of the client.
+ Cell firstKV = results.get(0);
+ Delete delete = new Delete(firstKV.getRowArray(),
+ firstKV.getRowOffset(), firstKV.getRowLength(),ts);
+ if (replayMutations != null) {
+ delete.setAttribute(REPLAY_WRITES, replayMutations);
+ }
+ mutations.add(delete);
+ // force tephra to ignore this deletes
+ delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
+
+ void deleteCForQ(Tuple result, List<Cell> results, UngroupedAggregateRegionObserver.MutationList mutations) {
+ // No need to search for delete column, since we project only it
+ // if no empty key value is being set
+ if (emptyCF == null ||
+ result.getValue(deleteCF, deleteCQ) != null) {
+ Delete delete = new Delete(results.get(0).getRowArray(),
+ results.get(0).getRowOffset(),
+ results.get(0).getRowLength());
+ delete.addColumns(deleteCF, deleteCQ, ts);
+ // force tephra to ignore this deletes
+ delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ mutations.add(delete);
+ }
+ }
+ void upsert(Tuple result, ImmutableBytesWritable ptr, UngroupedAggregateRegionObserver.MutationList mutations) {
+ Arrays.fill(values, null);
+ int bucketNumOffset = 0;
+ if (projectedTable.getBucketNum() != null) {
+ values[0] = new byte[] { 0 };
+ bucketNumOffset = 1;
+ }
+ int i = bucketNumOffset;
+ List<PColumn> projectedColumns = projectedTable.getColumns();
+ for (; i < projectedTable.getPKColumns().size(); i++) {
+ Expression expression = selectExpressions.get(i - bucketNumOffset);
+ if (expression.evaluate(result, ptr)) {
+ values[i] = ptr.copyBytes();
+ // If SortOrder from expression in SELECT doesn't match the
+ // column being projected into then invert the bits.
+ if (expression.getSortOrder() !=
+ projectedColumns.get(i).getSortOrder()) {
+ SortOrder.invert(values[i], 0, values[i], 0,
+ values[i].length);
+ }
+ }else{
+ values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
+ }
+ }
+ projectedTable.newKey(ptr, values);
+ PRow row = projectedTable.newRow(GenericKeyValueBuilder.INSTANCE, ts, ptr, false);
+ for (; i < projectedColumns.size(); i++) {
+ Expression expression = selectExpressions.get(i - bucketNumOffset);
+ if (expression.evaluate(result, ptr)) {
+ PColumn column = projectedColumns.get(i);
+ if (!column.getDataType().isSizeCompatible(ptr, null,
+ expression.getDataType(), expression.getSortOrder(),
+ expression.getMaxLength(), expression.getScale(),
+ column.getMaxLength(), column.getScale())) {
+ throw new DataExceedsCapacityException(
+ column.getDataType(),
+ column.getMaxLength(),
+ column.getScale(),
+ column.getName().getString());
+ }
+ column.getDataType().coerceBytes(ptr, null,
+ expression.getDataType(), expression.getMaxLength(),
+ expression.getScale(), expression.getSortOrder(),
+ column.getMaxLength(), column.getScale(),
+ column.getSortOrder(), projectedTable.rowKeyOrderOptimizable());
+ byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ row.setValue(column, bytes);
+ }
+ }
+ for (Mutation mutation : row.toRowMutations()) {
+ if (replayMutations != null) {
+ mutation.setAttribute(REPLAY_WRITES, replayMutations);
+ } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) {
+ mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts);
+ }
+ mutations.add(mutation);
+ }
+ for (i = 0; i < selectExpressions.size(); i++) {
+ selectExpressions.get(i).reset();
+ }
+ }
+
+ void insertEmptyKeyValue(List<Cell> results, UngroupedAggregateRegionObserver.MutationList mutations) {
+ Set<Long> timeStamps =
+ Sets.newHashSetWithExpectedSize(results.size());
+ for (Cell kv : results) {
+ long kvts = kv.getTimestamp();
+ if (!timeStamps.contains(kvts)) {
+ Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
+ kv.getRowLength());
+ put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
+ ByteUtil.EMPTY_BYTE_ARRAY);
+ mutations.add(put);
+ }
+ }
+ }
+ @Override
+ public boolean next(List<Cell> resultsToReturn) throws IOException {
+ boolean hasMore;
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ Configuration conf = env.getConfiguration();
+ final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan));
+ try (MemoryManager.MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
+ Aggregators aggregators = ServerAggregators.deserialize(
+ scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), conf, em);
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ Cell lastCell = null;
+ boolean hasAny = false;
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+ UngroupedAggregateRegionObserver.MutationList mutations = new UngroupedAggregateRegionObserver.MutationList();
+ if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
+ || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
+ mutations = new UngroupedAggregateRegionObserver.MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10));
+ }
+ region.startRegionOperation();
+ try {
+ synchronized (innerScanner) {
+ do {
+ ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
+ List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
+ // Results are potentially returned even when the return value of s.next is false
+ // since this is an indication of whether or not there are more values after the
+ // ones returned
+ hasMore = innerScanner.nextRaw(results);
+ if (!results.isEmpty()) {
+ lastCell = results.get(0);
+ result.setKeyValues(results);
+ if (isDescRowKeyOrderUpgrade) {
+ if (!descRowKeyOrderUpgrade(results, ptr, mutations)) {
+ continue;
+ }
+ } else if (buildLocalIndex) {
+ buildLocalIndex(result, results, ptr);
+ } else if (isDelete) {
+ deleteRow(results, mutations);
+ } else if (isUpsert) {
+ upsert(result, ptr, mutations);
+ } else if (deleteCF != null && deleteCQ != null) {
+ deleteCForQ(result, results, mutations);
+ }
+ if (emptyCF != null) {
+ /*
+ * If we've specified an emptyCF, then we need to insert an empty
+ * key value "retroactively" for any key value that is visible at
+ * the timestamp that the DDL was issued. Key values that are not
+ * visible at this timestamp will not ever be projected up to
+ * scans past this timestamp, so don't need to be considered.
+ * We insert one empty key value per row per timestamp.
+ */
+ insertEmptyKeyValue(results, mutations);
+ }
+ if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+ ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+ txState, targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
+ mutations.clear();
+ }
+ // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+
+ if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+ setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
+ ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize);
+ indexMutations.clear();
+ }
+ aggregators.aggregate(rowAggregators, result);
+ hasAny = true;
+ }
+ } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeInMs);
+
+ if (!mutations.isEmpty()) {
+ ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
+ targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
+ mutations.clear();
+ }
+ if (!indexMutations.isEmpty()) {
+ ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize);
+ indexMutations.clear();
+ }
+ }
+ } catch (InsufficientMemoryException e) {
+ throw new DoNotRetryIOException(e);
+ } catch (DataExceedsCapacityException e) {
+ throw new DoNotRetryIOException(e.getMessage(), e);
+ } catch (Throwable e) {
+ LOGGER.error("Exception in UngroupedAggreagteRegionScanner for region "
+ + region.getRegionInfo().getRegionNameAsString(), e);
+ throw e;
+ }
+ Cell keyValue;
+ if (hasAny) {
+ byte[] value = aggregators.toBytes(rowAggregators);
+ keyValue = PhoenixKeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+ AGG_TIMESTAMP, value, 0, value.length);
+ resultsToReturn.add(keyValue);
+ }
+ return hasMore;
+ } finally {
+ region.closeRegionOperation();
+ }
+ }
+
+ @Override
+ public long getMaxResultSize() {
+ return scan.getMaxResultSize();
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 583baa8..3d7447a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NO
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import java.io.IOException;
import java.sql.SQLException;
@@ -39,10 +40,12 @@ import javax.annotation.concurrent.GuardedBy;
import org.apache.hadoop.hbase.client.AbstractClientScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.MutationState;
@@ -134,6 +137,7 @@ public class TableResultIterator implements ResultIterator {
this.retry=plan.getContext().getConnection().getQueryServices().getProps()
.getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
IndexUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection());
+ scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGING, TRUE_BYTES);
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
index 0bf5982..d19c5b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.*;
import java.sql.SQLException;
+import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -33,21 +34,37 @@ public class UngroupedAggregatingResultIterator extends GroupedAggregatingResult
public UngroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) {
super(resultIterator, aggregators);
}
-
@Override
public Tuple next() throws SQLException {
- Tuple result = super.next();
- // Ensure ungrouped aggregregation always returns a row, even if the underlying iterator doesn't.
- if (result == null && !hasRows) {
- // We should reset ClientAggregators here in case they are being reused in a new ResultIterator.
- aggregators.reset(aggregators.getAggregators());
- byte[] value = aggregators.toBytes(aggregators.getAggregators());
- result = new SingleKeyValueTuple(
- PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY,
- SINGLE_COLUMN_FAMILY,
- SINGLE_COLUMN,
- AGG_TIMESTAMP,
- value));
+ Tuple result = resultIterator.next();
+ if (result == null) {
+ // Ensure ungrouped aggregregation always returns a row, even if the underlying iterator doesn't.
+ if (!hasRows) {
+ // We should reset ClientAggregators here in case they are being reused in a new ResultIterator.
+ aggregators.reset(aggregators.getAggregators());
+ byte[] value = aggregators.toBytes(aggregators.getAggregators());
+ result = new SingleKeyValueTuple(
+ PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY,
+ SINGLE_COLUMN_FAMILY,
+ SINGLE_COLUMN,
+ AGG_TIMESTAMP,
+ value));
+ }
+ } else {
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ while (true) {
+ aggregators.aggregate(rowAggregators, result);
+ Tuple nextResult = resultIterator.peek();
+ if (nextResult == null) {
+ break;
+ }
+ result = resultIterator.next();
+ }
+
+ byte[] value = aggregators.toBytes(rowAggregators);
+ Tuple tuple = wrapKeyValueAsResult(PhoenixKeyValueUtil .newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+ result = tuple;
}
hasRows = true;
return result;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e45c30e..b50af9f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -327,6 +327,8 @@ public interface QueryServices extends SQLCloseable {
public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled";
// The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
+ // The number of rows to be scanned in one RPC call
+ public static final String UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.ungrouped.aggregate_page_size_in_ms";
// Before 4.15 when we created a view we included the parent table column metadata in the view
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 0a73ca9..2122310 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -337,7 +337,8 @@ public class QueryServicesOptions {
public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
- public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
+ public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1000;
+ public static final long DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000; // 1 second
public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index b2a346f..ccc72ec 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
import static org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY;
import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY;
import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -140,7 +141,6 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
-import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
@@ -626,6 +626,13 @@ public abstract class BaseTest {
conf.setInt(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 10000);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
+ conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0);
+ if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) {
+ conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
+ // This results in processing one row at a time in each next operation of the aggregate region
+ // scanner, i.e., one row pages. In other words, 0ms page allows only one row to be processed
+ // within one page; 0ms page is equivalent to one-row page
+ }
return conf;
}