You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/05/08 08:23:03 UTC
git commit: PHOENIX-937 Handle puts on local index table (Rajeshbabu)
Repository: incubator-phoenix
Updated Branches:
refs/heads/local-index 360f5a1cd -> 276de4a19
PHOENIX-937 Handle puts on local index table (Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/276de4a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/276de4a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/276de4a1
Branch: refs/heads/local-index
Commit: 276de4a195b3bdb851df9cf87d2626a1218c65d0
Parents: 360f5a1
Author: James Taylor <jt...@salesforce.com>
Authored: Wed May 7 23:23:37 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed May 7 23:23:37 2014 -0700
----------------------------------------------------------------------
.../phoenix/end2end/index/LocalIndexIT.java | 80 ++++++++++++++++++--
.../coprocessor/BaseScannerRegionObserver.java | 1 +
.../UngroupedAggregateRegionObserver.java | 65 +++++++++++++++-
.../hbase/index/util/IndexManagementUtil.java | 12 +--
.../apache/phoenix/index/IndexMaintainer.java | 4 +-
.../apache/phoenix/schema/MetaDataClient.java | 54 ++++++++++++-
6 files changed, 197 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 59bbc7f..6bc1b90 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -33,6 +33,10 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -67,8 +71,8 @@ public class LocalIndexIT extends BaseIndexIT {
"k2 INTEGER NOT NULL,\n" +
"v1 VARCHAR,\n" +
"CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
- + (saltBuckets == null || splits != null ? "" : (",salt_buckets=" + saltBuckets)
- + (saltBuckets != null || splits == null ? "" : ",splits=" + splits));
+ + (saltBuckets != null && splits == null ? (",salt_buckets=" + saltBuckets) : ""
+ + (saltBuckets == null && splits != null ? (" split on " + splits) : ""));
conn.createStatement().execute(ddl);
conn.close();
}
@@ -91,13 +95,13 @@ public class LocalIndexIT extends BaseIndexIT {
Connection conn1 = DriverManager.getConnection(getUrl());
Connection conn2 = DriverManager.getConnection(getUrl());
try {
- conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)"+" splits={1,2,3}");
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)"+" split on (1,2,3)");
fail("Local index cannot be pre-split");
} catch (SQLException e) { }
try {
conn2.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_FULL_NAME).next();
conn2.unwrap(PhoenixConnection.class).getMetaDataCache().getTable(new PTableKey(null,INDEX_TABLE_NAME));
- fail("Local index should be created.");
+ fail("Local index should not be created.");
} catch (TableNotFoundException e) { }
}
@@ -119,7 +123,7 @@ public class LocalIndexIT extends BaseIndexIT {
@Test
public void testLocalIndexTableRegionSplitPolicyAndSplitKeys() throws Exception {
- createBaseTable(DATA_TABLE_NAME, null,"{1,2,3}");
+ createBaseTable(DATA_TABLE_NAME, null,"('e','i','o')");
Connection conn1 = DriverManager.getConnection(getUrl());
Connection conn2 = DriverManager.getConnection(getUrl());
conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)");
@@ -150,4 +154,70 @@ public class LocalIndexIT extends BaseIndexIT {
+ " FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
assertFalse("View index sequences should be deleted.", rs.next());
}
+
+ @Test
+ public void testPutsToLocalIndexTable() throws Exception {
+ createBaseTable(DATA_TABLE_NAME, null, "('e','i','o')");
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)");
+ conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('b',1,2,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('f',1,2,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('j',2,4,'a')");
+ conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('q',3,1,'c')");
+ conn1.commit();
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + INDEX_TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ HTable indexTable = new HTable(admin.getConfiguration() ,TableName.valueOf(MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)));
+ Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys();
+ byte[][] startKeys = startEndKeys.getFirst();
+ byte[][] endKeys = startEndKeys.getSecond();
+ for (int i = 0; i < startKeys.length; i++) {
+ Scan s = new Scan();
+ s.setStartRow(startKeys[i]);
+ s.setStopRow(endKeys[i]);
+ ResultScanner scanner = indexTable.getScanner(s);
+ int count = 0;
+ for(Result r:scanner){
+ count++;
+ }
+ scanner.close();
+ assertEquals(1, count);
+ }
+ indexTable.close();
+ }
+
+ @Test
+ public void testBuildIndexWhenUserTableAlreadyHasData() throws Exception {
+ createBaseTable(DATA_TABLE_NAME, null, "('e','i','o')");
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('b',1,2,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('f',1,2,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('j',2,4,'a')");
+ conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('q',3,1,'c')");
+ conn1.commit();
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)");
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + INDEX_TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ HTable indexTable = new HTable(admin.getConfiguration() ,TableName.valueOf(MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)));
+ Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys();
+ byte[][] startKeys = startEndKeys.getFirst();
+ byte[][] endKeys = startEndKeys.getSecond();
+ for (int i = 0; i < startKeys.length; i++) {
+ Scan s = new Scan();
+ s.setStartRow(startKeys[i]);
+ s.setStopRow(endKeys[i]);
+ ResultScanner scanner = indexTable.getScanner(s);
+ int count = 0;
+ for(Result r:scanner){
+ count++;
+ }
+ scanner.close();
+ assertEquals(1, count);
+ }
+ indexTable.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 6f0124a..9e42f64 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -45,6 +45,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
public static final String GROUP_BY_LIMIT = "_GroupByLimit";
public static final String LOCAL_INDEX = "_LocalIndex";
+ public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
/**
* Used by logger to identify coprocessor
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0139400..799bc96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_BYTES_PTR;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import java.io.ByteArrayInputStream;
@@ -31,6 +32,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -56,8 +59,12 @@ import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.join.ScanProjector;
@@ -73,6 +80,7 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
@@ -130,6 +138,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
if (isUngroupedAgg == null) {
return s;
}
+
+ byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+ List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
+ List<Mutation> indexMutations = localIndexBytes == null ? Collections.<Mutation>emptyList() : Lists.<Mutation>newArrayListWithExpectedSize(1024);
final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
@@ -165,6 +177,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
}
+ if(localIndexBytes != null) {
+ ptr = new ImmutableBytesWritable();
+ }
int batchSize = 0;
long ts = scan.getTimeRange().getMax();
@@ -197,7 +212,26 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
rowCount++;
result.setKeyValues(results);
try {
- if (isDelete) {
+ if (indexMaintainers != null) {
+ for (IndexMaintainer maintainer : indexMaintainers) {
+ ImmutableBytesPtr emptyKeyValueFamily = maintainer.getEmptyKeyValueFamily();
+ Iterator<Cell> iterator = results.iterator();
+ while (iterator.hasNext()) {
+ Cell cell = iterator.next();
+ if (Bytes.compareTo(cell.getRowArray(), cell.getFamilyOffset(), cell.getFamilyLength(), emptyKeyValueFamily.get(), emptyKeyValueFamily.getOffset(), emptyKeyValueFamily.getLength()) == 0
+ && Bytes.compareTo(cell.getRowArray(), cell.getQualifierOffset(), cell.getQualifierLength(), EMPTY_COLUMN_BYTES_PTR.get(), EMPTY_COLUMN_BYTES_PTR.getOffset(), EMPTY_COLUMN_BYTES_PTR.getLength()) == 0) {
+ iterator.remove();
+ }
+ }
+ if (!results.isEmpty()) {
+ result.getKey(ptr);
+ ValueGetter valueGetter = IndexManagementUtil.createGetterFromKeyValues(results);
+ Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, c.getEnvironment().getRegion().getStartKey());
+ indexMutations.add(put);
+ }
+ }
+ result.setKeyValues(results);
+ } else if (isDelete) {
// FIXME: the version of the Delete constructor without the lock args was introduced
// in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
// of the client.
@@ -278,6 +312,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
commitBatch(region, mutations, indexUUID);
mutations.clear();
}
+ // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+ if (!indexMutations.isEmpty() && batchSize > 0 && indexMutations.size() % batchSize == 0) {
+ HRegion indexRegion = getIndexRegion(c.getEnvironment());
+ // Get indexRegion corresponding to data region
+ commitBatch(indexRegion, indexMutations, null);
+ indexMutations.clear();
+ }
+
} catch (ConstraintViolationException e) {
// Log and ignore in count
logger.error("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), e);
@@ -300,6 +342,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
commitBatch(region,mutations, indexUUID);
}
+ if (!indexMutations.isEmpty()) {
+ HRegion indexRegion = getIndexRegion(c.getEnvironment());
+ // Get indexRegion corresponding to data region
+ commitBatch(indexRegion, indexMutations, null);
+ indexMutations.clear();
+ }
+
final boolean hadAny = hasAny;
KeyValue keyValue = null;
if (hadAny) {
@@ -341,7 +390,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
};
return scanner;
}
-
+
+ private HRegion getIndexRegion(RegionCoprocessorEnvironment environment) throws IOException {
+ HRegion userRegion = environment.getRegion();
+ TableName indexTableName = TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(userRegion.getTableDesc().getName()));
+ List<HRegion> onlineRegions = environment.getRegionServerServices().getOnlineRegions(indexTableName);
+ for(HRegion indexRegion : onlineRegions) {
+ if (Bytes.compareTo(userRegion.getStartKey(), indexRegion.getStartKey()) == 0) {
+ return indexRegion;
+ }
+ }
+ return null;
+ }
+
private static PTable deserializeTable(byte[] b) {
try {
PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 2199b4f..917b02c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
@@ -100,15 +101,14 @@ public class IndexManagementUtil {
}
- public static ValueGetter createGetterFromKeyValues(Collection<KeyValue> pendingUpdates) {
+ public static ValueGetter createGetterFromKeyValues(Collection<Cell> pendingUpdates) {
final Map<ReferencingColumn, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
.size());
- for (KeyValue kv : pendingUpdates) {
+ for (Cell kv : pendingUpdates) {
// create new pointers to each part of the kv
- ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength());
- ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getBuffer(), kv.getQualifierOffset(),
- kv.getQualifierLength());
- ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+ ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getRowArray(),kv.getFamilyOffset(),kv.getFamilyLength());
+ ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength());
+ ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
valueMap.put(new ReferencingColumn(family, qual), value);
}
return new ValueGetter() {
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 1ca330f..a3a8791 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -511,7 +511,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
return allColumns;
}
- private ImmutableBytesPtr getEmptyKeyValueFamily() {
+ public ImmutableBytesPtr getEmptyKeyValueFamily() {
// Since the metadata of an index table will never change,
// we can infer this based on the family of the first covered column
// If if there are no covered columns, we know it's our default name
@@ -697,7 +697,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
private int getIndexPkColumnCount() {
- return dataRowKeySchema.getFieldCount() + indexedColumns.size() - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0) - (viewIndexId == null ? 0 : 1);
+ return dataRowKeySchema.getFieldCount() + indexedColumns.size() - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0) /*+ (viewIndexId == null ? 0 : 1)*/;
}
private RowKeyMetaData newRowKeyMetaData() {
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 034f212..b420c15 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -75,6 +75,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -82,6 +83,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ColumnResolver;
@@ -89,14 +92,20 @@ import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.PostDDLCompiler;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AlterIndexStatement;
import org.apache.phoenix.parse.ColumnDef;
@@ -489,9 +498,43 @@ public class MetaDataClient {
connection.rollback();
try {
connection.setAutoCommit(true);
- PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
- MutationPlan plan = compiler.compile(index);
- MutationState state = connection.getQueryServices().updateData(plan);
+ MutationState state;
+ // For local indexes, we optimize the initial index population by *not* sending Puts over
+ // the wire for the index rows, as we don't need to do that. Instead, we tap into our
+ // region observer to generate the index rows based on the data rows as we scan
+ if (index.getIndexType() == IndexType.LOCAL) {
+ final PhoenixStatement statement = new PhoenixStatement(connection);
+ String query = "SELECT count(*) FROM \"" + dataTableRef.getTable().getName().getString() + "\"";
+ QueryPlan plan = statement.compileQuery(query);
+ // Set attribute on scan that UngroupedAggregateRegionObserver will switch on.
+ // We'll detect that this attribute was set the server-side and write the index
+ // rows per region as a result. The value of the attribute will be our persisted
+ // index maintainers.
+ // Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver
+ Scan scan = plan.getContext().getScan();
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ PTable dataTable = dataTableRef.getTable();
+ dataTable.getIndexMaintainers(ptr);
+ scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+ // However, in this case, we need to project all of the data columns that contribute to the index.
+ IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable);
+ for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+ scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+ }
+ Cell kv = plan.iterator().next().getValue(0);
+ ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ // A single Cell will be returned with the count(*) - we decode that here
+ long rowCount = PDataType.LONG.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+ // The contract is to return a MutationState that contains the number of rows modified. In this
+ // case, it's the number of rows in the data table which corresponds to the number of index
+ // rows that were added.
+ state = new MutationState(0, connection, rowCount);
+ } else {
+ PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
+ MutationPlan plan = compiler.compile(index);
+ state = connection.getQueryServices().updateData(plan);
+ }
AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
@@ -685,7 +728,10 @@ public class MetaDataClient {
if (connection.getSCN() != null) {
return buildIndexAtTimeStamp(table, statement.getTable());
}
-
+ if (statement.getIndexType() == IndexType.LOCAL) {
+ ColumnResolver resolver = FromCompiler.getResolverForMutation(statement, connection);
+ tableRef = resolver.getTables().get(0);
+ }
return buildIndex(table, tableRef);
}