You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2016/10/21 14:04:43 UTC
phoenix git commit: PHOENIX-3161 Check possibility of moving
rebuilding code to coprocessor of data table.
Repository: phoenix
Updated Branches:
refs/heads/master 5c9fb7b68 -> a95e8ab1a
PHOENIX-3161 Check possibility of moving rebuilding code to coprocessor of data table.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a95e8ab1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a95e8ab1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a95e8ab1
Branch: refs/heads/master
Commit: a95e8ab1af2b8defef3d8c0ed5a060c9b9881dd9
Parents: 5c9fb7b
Author: Ankit Singhal <an...@gmail.com>
Authored: Fri Oct 21 19:34:08 2016 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Fri Oct 21 19:34:08 2016 +0530
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 11 +-
.../apache/phoenix/compile/PostDDLCompiler.java | 2 +-
.../coprocessor/BaseScannerRegionObserver.java | 1 +
.../coprocessor/MetaDataRegionObserver.java | 101 +++++++------------
.../UngroupedAggregateRegionObserver.java | 100 +++++++++++++++++-
.../hbase/index/util/IndexManagementUtil.java | 13 ++-
.../java/org/apache/phoenix/util/ScanUtil.java | 4 +
7 files changed, 154 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index d6c1e9c..4263890 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -92,7 +92,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT {
public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped) {
this.transactional = transactional;
this.localIndex = localIndex;
- this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : "";
+ this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "");
this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "")
+ (isNamespaceMapped ? "_NM" : "");
this.indexName = INDEX_NAME;
@@ -180,8 +180,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT {
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
- String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
- + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped);
+ String expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
+ + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT";
assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
@@ -232,8 +232,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT {
// Verify previous writes succeeded to data table
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
- expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
- + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped);
+ expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
+ + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT";
assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
@@ -254,6 +254,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT {
// re-enable index table
FAIL_WRITE = false;
waitForIndexToBeActive(conn,indexName);
+ waitForIndexToBeActive(conn,indexName+"_2");
waitForIndexToBeActive(conn,secondIndexName);
// Verify UPSERT on data table still work after index table is recreated
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 004e254..393499a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -210,7 +210,7 @@ public class PostDDLCompiler {
if (ts!=HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) {
ts = TransactionUtil.convertToNanoseconds(ts);
}
- ScanUtil.setTimeRange(scan, ts);
+ ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
if (emptyCF != null) {
scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 4fa1399..f6bd512 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -88,6 +88,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
public static final String REVERSE_SCAN = "_ReverseScan";
public static final String ANALYZE_TABLE = "_ANALYZETABLE";
+ public static final String REBUILD_INDEXES = "_RebuildIndexes";
public static final String TX_STATE = "_TxState";
public static final String GUIDEPOST_WIDTH_BYTES = "_GUIDEPOST_WIDTH_BYTES";
public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index c645cf4..e790b59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -17,8 +17,9 @@
*/
package org.apache.phoenix.coprocessor;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
import java.io.IOException;
-import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
@@ -33,16 +34,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -55,19 +52,19 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.PostDDLCompiler;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.parse.AlterIndexStatement;
-import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.MetaDataClient;
@@ -75,7 +72,7 @@ import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.MetaDataUtil;
@@ -281,6 +278,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
// Set SCN so that we don't ping server and have the upper bound set back to
// the timestamp when the failure occurred.
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE));
+
+ //Set timeout to max value as rebuilding may take time
+ props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
+ props.setProperty(QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB, Long.toString(Long.MAX_VALUE));
+ props.setProperty(QueryServices.RPC_TIMEOUT_ATTRIB, Long.toString(Long.MAX_VALUE));
// don't run a second index populations upsert select
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0");
conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class);
@@ -305,7 +307,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
// Allow index to begin incremental maintenance as index is back online and we
// cannot transition directly from DISABLED -> ACTIVE
if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState) == 0) {
- updateIndexState(indexTableFullName, env, PIndexState.DISABLE, PIndexState.INACTIVE);
+ updateIndexState(conn, indexTableFullName, env, PIndexState.DISABLE, PIndexState.INACTIVE);
}
List<PTable> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable);
if (indexesToPartiallyRebuild == null) {
@@ -344,62 +346,31 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild
+ " from timestamp=" + timeStamp);
- Scan dataTableScan = IndexManagementUtil.newLocalStateScan(maintainers);
+ TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false);
+ // TODO Need to set high timeout
+ PostDDLCompiler compiler = new PostDDLCompiler(conn);
+ MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null,
+ HConstants.LATEST_TIMESTAMP);
+ Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(),
+ maintainers);
dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
- byte[] physicalTableName = dataPTable.getPhysicalName().getBytes();
- try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) {
- Result result;
- try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) {
- int batchSize = conn.getMutateBatchSize();
- List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
- ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
- ByteUtil.EMPTY_BYTE_ARRAY);
- IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr,
- indexesToPartiallyRebuild, conn);
- byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
- byte[] uuidValue = ServerCacheClient.generateId();
+ dataTableScan.setCacheBlocks(false);
+ dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+
+ ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
+ ByteUtil.EMPTY_BYTE_ARRAY);
+ IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild,
+ conn);
+ byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
- while ((result = dataTableScanner.next()) != null && !result.isEmpty()) {
- Put put = null;
- Delete del = null;
- for (Cell cell : result.rawCells()) {
- if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
- if (put == null) {
- put = new Put(CellUtil.cloneRow(cell));
- put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
- put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
- PDataType.TRUE_BYTES);
- mutations.add(put);
- }
- put.add(cell);
- } else {
- if (del == null) {
- del = new Delete(CellUtil.cloneRow(cell));
- del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
- del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
- PDataType.TRUE_BYTES);
- mutations.add(del);
- }
- del.addDeleteMarker(cell);
- }
- }
- if (mutations.size() == batchSize) {
- dataHTable.batch(mutations);
- uuidValue = ServerCacheClient.generateId();
- mutations.clear();
- }
- }
- if (!mutations.isEmpty()) {
- dataHTable.batch(mutations);
- }
- }
- }
+ dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ MutationState mutationState = plan.execute();
+ long rowCount = mutationState.getUpdateCount();
+ LOG.info(rowCount + " rows of index which are rebuild");
for (PTable indexPTable : indexesToPartiallyRebuild) {
String indexTableFullName = SchemaUtil.getTableName(indexPTable.getSchemaName()
.getString(), indexPTable.getTableName().getString());
- updateIndexState(indexTableFullName, env, PIndexState.INACTIVE, PIndexState.ACTIVE);
+ updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE, PIndexState.ACTIVE);
}
} catch (Exception e) { // Log, but try next table's indexes
LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild
@@ -429,11 +400,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
}
}
- private static void updateIndexState(String indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState,
+ private static void updateIndexState(PhoenixConnection conn, String indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState,
PIndexState newState) throws ServiceException, Throwable {
byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName);
- String indexName = SchemaUtil.getSchemaNameFromFullName(indexTableName);
+ String indexName = SchemaUtil.getTableNameFromFullName(indexTableName);
// Mimic the Put that gets generated by the client on an update of the index state
Put put = new Put(indexTableKey);
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
@@ -443,9 +414,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
PLong.INSTANCE.toBytes(0));
}
final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put);
- Connection conn = QueryUtil.getConnection(env.getConfiguration());
- MetaDataMutationResult result = conn.unwrap(PhoenixConnection.class).getQueryServices()
- .updateIndexState(tableMetadata, null);
+ MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null);
MutationCode code = result.getMutationCode();
if (code == MutationCode.TABLE_NOT_FOUND) { throw new TableNotFoundException(schemaName, indexName); }
if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { throw new SQLExceptionInfo.Builder(
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0f175c5..38f7253 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -43,6 +43,7 @@ import javax.annotation.concurrent.GuardedBy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.DataExceedsCapacityException;
import org.apache.phoenix.execute.TupleProjector;
@@ -263,7 +265,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
env, region.getRegionInfo().getTable().getNameAsString(), ts,
gp_width_bytes, gp_per_region_bytes);
return collectStats(s, statsCollector, region, scan, env.getConfiguration());
- }
+ } else if (ScanUtil.isIndexRebuild(scan)) { return rebuildIndices(s, region, scan, env.getConfiguration()); }
int offsetToBe = 0;
if (localIndexScan) {
/*
@@ -725,6 +727,102 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
+ private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
+ Configuration config) throws IOException {
+ byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ boolean hasMore;
+ long rowCount = 0;
+ try {
+ int batchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
+ region.startRegionOperation();
+ byte[] uuidValue = ServerCacheClient.generateId();
+ synchronized (innerScanner) {
+ do {
+ List<Cell> results = new ArrayList<Cell>();
+ hasMore = innerScanner.nextRaw(results);
+ if (!results.isEmpty()) {
+ Put put = null;
+ Delete del = null;
+ for (Cell cell : results) {
+
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null) {
+ put = new Put(CellUtil.cloneRow(cell));
+ put.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMetaData);
+ put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
+ PDataType.TRUE_BYTES);
+ mutations.add(put);
+ }
+ put.add(cell);
+ } else {
+ if (del == null) {
+ del = new Delete(CellUtil.cloneRow(cell));
+ del.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMetaData);
+ del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
+ PDataType.TRUE_BYTES);
+ mutations.add(del);
+ }
+ del.addDeleteMarker(cell);
+ }
+ }
+ if (mutations.size() >= batchSize) {
+ region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
+ HConstants.NO_NONCE);
+ uuidValue = ServerCacheClient.generateId();
+ mutations.clear();
+ }
+ rowCount++;
+ }
+
+ } while (hasMore);
+ if (!mutations.isEmpty()) {
+ region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
+ HConstants.NO_NONCE);
+ }
+ }
+ } catch (IOException e) {
+ logger.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
+ throw e;
+ } finally {
+ region.closeRegionOperation();
+ }
+ byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
+ final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+ SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+
+ RegionScanner scanner = new BaseRegionScanner(innerScanner) {
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() {
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op because we want to manage closing of the inner scanner ourselves.
+ }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ results.add(aggKeyValue);
+ return false;
+ }
+
+ @Override
+ public long getMaxResultSize() {
+ return scan.getMaxResultSize();
+ }
+ };
+ return scanner;
+ }
+
private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
final Region region, final Scan scan, Configuration config) throws IOException {
StatsCollectionCallable callable =
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 4519145..c6642e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -20,18 +20,14 @@ package org.apache.phoenix.hbase.index.util;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-
-import com.google.common.collect.Maps;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException;
import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter;
@@ -150,7 +146,14 @@ public class IndexManagementUtil {
}
public static Scan newLocalStateScan(List<? extends Iterable<? extends ColumnReference>> refsArray) {
- Scan s = new Scan();
+ return newLocalStateScan(null, refsArray);
+ }
+
+ public static Scan newLocalStateScan(Scan scan, List<? extends Iterable<? extends ColumnReference>> refsArray) {
+ Scan s = scan;
+ if (scan == null) {
+ s = new Scan();
+ }
s.setRaw(true);
// add the necessary columns to the scan
for (Iterable<? extends ColumnReference> refs : refsArray) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index b0e8a99..acaeb31 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -895,4 +895,8 @@ public class ScanUtil {
return true;
}
+ public static boolean isIndexRebuild(Scan scan) {
+ return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) != null;
+ }
+
}
\ No newline at end of file