You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/09/30 04:13:09 UTC
git commit: PHOENIX-1249 Support local immutable index
Repository: phoenix
Updated Branches:
refs/heads/master abd049d88 -> f28fb8b7c
PHOENIX-1249 Support local immutable index
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f28fb8b7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f28fb8b7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f28fb8b7
Branch: refs/heads/master
Commit: f28fb8b7c5075ccb7a522baa58c2b3ab3003dbe7
Parents: abd049d
Author: Rajeshbabu Chintaguntla <ra...@huawei.com>
Authored: Tue Sep 30 07:40:17 2014 +0530
Committer: Rajeshbabu Chintaguntla <ra...@huawei.com>
Committed: Tue Sep 30 07:40:17 2014 +0530
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/DeleteIT.java | 11 +-
.../phoenix/end2end/index/ImmutableIndexIT.java | 74 ++++++++++++--
.../phoenix/end2end/index/LocalIndexIT.java | 60 +++++++----
.../apache/phoenix/compile/DeleteCompiler.java | 13 ++-
.../hbase/index/covered/IndexUpdate.java | 2 +-
.../apache/phoenix/index/IndexMaintainer.java | 55 ++++++++--
.../phoenix/index/PhoenixIndexBuilder.java | 20 +++-
.../apache/phoenix/index/PhoenixIndexCodec.java | 101 +++++++++++--------
.../apache/phoenix/schema/MetaDataClient.java | 3 -
9 files changed, 255 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f28fb8b7/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
index 337e49b..677fb53 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
@@ -344,6 +344,15 @@ public class DeleteIT extends BaseHBaseManagedTimeIT {
@Test
public void testDeleteRowFromTableWithImmutableIndex() throws SQLException {
+ testDeleteRowFromTableWithImmutableIndex(false);
+ }
+
+ @Test
+ public void testDeleteRowFromTableWithImmutableLocalIndex() throws SQLException {
+ testDeleteRowFromTableWithImmutableIndex(true);
+ }
+
+ public void testDeleteRowFromTableWithImmutableIndex(boolean localIndex) throws SQLException {
Connection con = null;
try {
boolean autoCommit = false;
@@ -360,7 +369,7 @@ public class DeleteIT extends BaseHBaseManagedTimeIT {
"USAGE.DB BIGINT," +
"STATS.ACTIVE_VISITOR INTEGER " +
"CONSTRAINT PK PRIMARY KEY (HOST, DOMAIN, FEATURE, DATE)) IMMUTABLE_ROWS=true");
- stm.execute("CREATE INDEX web_stats_idx ON web_stats (DATE, FEATURE)");
+ stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX web_stats_idx ON web_stats (DATE, FEATURE)");
stm.close();
Date date = new Date(0);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f28fb8b7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index b522931..c1a50da 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -113,12 +113,21 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
@Test
public void testIndexWithNullableFixedWithCols() throws Exception {
+ testIndexWithNullableFixedWithCols(false);
+ }
+
+ @Test
+ public void testLocalIndexWithNullableFixedWithCols() throws Exception {
+ testIndexWithNullableFixedWithCols(true);
+ }
+
+ public void testIndexWithNullableFixedWithCols(boolean localIndex) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(false);
ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
populateTestTable();
- String ddl = "CREATE INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ " (char_col1 ASC, int_col1 ASC)"
+ " INCLUDE (long_col1, long_col2)";
PreparedStatement stmt = conn.prepareStatement(ddl);
@@ -126,7 +135,13 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
String query = "SELECT char_col1, int_col1 from " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE;
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
- assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER INDEX_TEST.IDX", QueryUtil.getExplainPlan(rs));
+ if(localIndex) {
+ assertEquals(
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_INDEX_TEST.INDEX_DATA_TABLE [-32768]\nCLIENT MERGE SORT",
+ QueryUtil.getExplainPlan(rs));
+ } else {
+ assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER INDEX_TEST.IDX", QueryUtil.getExplainPlan(rs));
+ }
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
@@ -191,12 +206,21 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
@Test
public void testDeleteFromAllPKColumnIndex() throws Exception {
+ testDeleteFromAllPKColumnIndex(false);
+ }
+
+ @Test
+ public void testDeleteFromAllPKColumnLocalIndex() throws Exception {
+ testDeleteFromAllPKColumnIndex(true);
+ }
+
+ public void testDeleteFromAllPKColumnIndex(boolean localIndex) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(false);
ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
populateTestTable();
- String ddl = "CREATE INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ " (long_pk, varchar_pk)"
+ " INCLUDE (long_col1, long_col2)";
PreparedStatement stmt = conn.prepareStatement(ddl);
@@ -245,12 +269,21 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
@Test
public void testDropIfImmutableKeyValueColumn() throws Exception {
+ testDropIfImmutableKeyValueColumn(false);
+ }
+
+ @Test
+ public void testDropIfImmutableKeyValueColumnWithLocalIndex() throws Exception {
+ testDropIfImmutableKeyValueColumn(true);
+ }
+
+ public void testDropIfImmutableKeyValueColumn(boolean localIndex) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(false);
ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
populateTestTable();
- String ddl = "CREATE INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ " (long_col1)";
PreparedStatement stmt = conn.prepareStatement(ddl);
stmt.execute();
@@ -279,12 +312,21 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
@Test
public void testGroupByCount() throws Exception {
+ testGroupByCount(false);
+ }
+
+ @Test
+ public void testGroupByCountWithLocalIndex() throws Exception {
+ testGroupByCount(true);
+ }
+
+ public void testGroupByCount(boolean localIndex) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(false);
ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
populateTestTable();
- String ddl = "CREATE INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ " (int_col2)";
PreparedStatement stmt = conn.prepareStatement(ddl);
stmt.execute();
@@ -298,10 +340,19 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
@Test
public void testSelectDistinctOnTableWithSecondaryImmutableIndex() throws Exception {
+ testSelectDistinctOnTableWithSecondaryImmutableIndex(false);
+ }
+
+ @Test
+ public void testSelectDistinctOnTableWithSecondaryImmutableLocalIndex() throws Exception {
+ testSelectDistinctOnTableWithSecondaryImmutableIndex(true);
+ }
+
+ public void testSelectDistinctOnTableWithSecondaryImmutableIndex(boolean localIndex) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
populateTestTable();
- String ddl = "CREATE INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ " (int_col2)";
Connection conn = null;
PreparedStatement stmt = null;
@@ -334,12 +385,21 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
@Test
public void testInClauseWithIndexOnColumnOfUsignedIntType() throws Exception {
+ testInClauseWithIndexOnColumnOfUsignedIntType(false);
+ }
+
+ @Test
+ public void testInClauseWithLocalIndexOnColumnOfUsignedIntType() throws Exception {
+ testInClauseWithIndexOnColumnOfUsignedIntType(true);
+ }
+
+ public void testInClauseWithIndexOnColumnOfUsignedIntType(boolean localIndex) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = null;
PreparedStatement stmt = null;
ensureTableCreated(getUrl(), INDEX_DATA_TABLE);
populateTestTable();
- String ddl = "CREATE INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ String ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IDX ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE
+ " (int_col1)";
try {
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f28fb8b7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 76dd281..21fb970 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -132,23 +132,6 @@ public class LocalIndexIT extends BaseIndexIT {
}
@Test
- public void testLocalIndexOnTableWithImmutableRows() throws Exception {
- createBaseTable(DATA_TABLE_NAME, null, null);
- Connection conn1 = DriverManager.getConnection(getUrl());
- Connection conn2 = DriverManager.getConnection(getUrl());
- try {
- conn1.createStatement().execute("ALTER TABLE " + DATA_TABLE_NAME + " SET IMMUTABLE_ROWS=true");
- conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)");
- fail("Local index aren't allowed on table with immutable rows");
- } catch (SQLException e) { }
- try {
- conn2.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_FULL_NAME).next();
- conn2.unwrap(PhoenixConnection.class).getMetaDataCache().getTable(new PTableKey(null,INDEX_TABLE_NAME));
- fail("Local index should not be created.");
- } catch (TableNotFoundException e) { }
- }
-
- @Test
public void testLocalIndexTableRegionSplitPolicyAndSplitKeys() throws Exception {
createBaseTable(DATA_TABLE_NAME, null,"('e','i','o')");
Connection conn1 = DriverManager.getConnection(getUrl());
@@ -579,6 +562,49 @@ public class LocalIndexIT extends BaseIndexIT {
}
@Test
+ public void testLocalIndexesOnTableWithImmutableRows() throws Exception {
+ createBaseTable(DATA_TABLE_NAME, null, "('e','i','o')");
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ try {
+ conn1.createStatement().execute("ALTER TABLE "+ DATA_TABLE_NAME + " SET IMMUTABLE_ROWS=true");
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)");
+ conn1.createStatement().execute("CREATE INDEX " + INDEX_TABLE_NAME + "2 ON " + DATA_TABLE_NAME + "(k3)");
+ conn1.commit();
+ conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('b',1,2,4,'z')");
+ conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('f',1,2,3,'a')");
+ conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('j',2,4,2,'a')");
+ conn1.createStatement().execute("UPSERT INTO " + DATA_TABLE_NAME + " values('q',3,1,1,'c')");
+ conn1.commit();
+ conn1 = DriverManager.getConnection(getUrl());
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + DATA_TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ rs = conn1.createStatement().executeQuery("SELECT v1 FROM " + DATA_TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString("v1"));
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString("v1"));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString("v1"));
+ assertTrue(rs.next());
+ assertEquals("z", rs.getString("v1"));
+ assertFalse(rs.next());
+ rs = conn1.createStatement().executeQuery("SELECT k3 FROM " + DATA_TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt("k3"));
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt("k3"));
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt("k3"));
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt("k3"));
+ assertFalse(rs.next());
+ } finally {
+ conn1.close();
+ }
+ }
+
+ @Test
public void testLocalIndexScanWithInList() throws Exception {
createBaseTable(DATA_TABLE_NAME, null, "('e','i','o')");
Connection conn1 = DriverManager.getConnection(getUrl());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f28fb8b7/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 59819b1..868c4cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -70,6 +70,7 @@ import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
@@ -170,9 +171,17 @@ public class DeleteCompiler {
if (!hasImmutableIndex(tableRef)) {
return false;
}
+ boolean isMultiTenant = tableRef.getTable().isMultiTenant();
for (PTable index : tableRef.getTable().getIndexes()) {
- for (PColumn column : index.getPKColumns()) {
- if (!IndexUtil.isDataPKColumn(column)) {
+ List<PColumn> pkColumns = index.getPKColumns();
+ boolean isLocalIndex = index.getIndexType() == IndexType.LOCAL;
+ int nIndexSaltBuckets =
+ index.getBucketNum() == null ? 0 : index.getBucketNum();
+ int numNonKVColumns =
+ (isMultiTenant ? 1 : 0) + (!isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0)
+ + (isLocalIndex ? 1 : 0);
+ for (int i = numNonKVColumns; i < pkColumns.size(); i++) {
+ if (!IndexUtil.isDataPKColumn(pkColumns.get(i))) {
return true;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f28fb8b7/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexUpdate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexUpdate.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexUpdate.java
index e3132d6..fd43d40 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexUpdate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexUpdate.java
@@ -31,7 +31,7 @@ public class IndexUpdate {
byte[] tableName;
ColumnTracker columns;
- IndexUpdate(ColumnTracker tracker) {
+ public IndexUpdate(ColumnTracker tracker) {
this.columns = tracker;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f28fb8b7/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 68cdb26..f8c73fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -135,6 +135,24 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
});
}
+ public static Iterator<PTable> enabledGlobalIndexIterator(Iterator<PTable> indexes) {
+ return Iterators.filter(indexes, new Predicate<PTable>() {
+ @Override
+ public boolean apply(PTable index) {
+ return !PIndexState.DISABLE.equals(index.getIndexState()) && !index.getIndexType().equals(IndexType.LOCAL);
+ }
+ });
+ }
+
+ public static Iterator<PTable> enabledLocalIndexIterator(Iterator<PTable> indexes) {
+ return Iterators.filter(indexes, new Predicate<PTable>() {
+ @Override
+ public boolean apply(PTable index) {
+ return !PIndexState.DISABLE.equals(index.getIndexState()) && index.getIndexType().equals(IndexType.LOCAL);
+ }
+ });
+ }
+
/**
* For client-side to serialize all IndexMaintainers for a given table
* @param dataTable data table
@@ -155,8 +173,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
List<PTable> indexes) {
Iterator<PTable> indexesItr = nonDisabledIndexIterator(indexes.iterator());
if ((dataTable.isImmutableRows()) || !indexesItr.hasNext()) {
- ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
- return;
+ indexesItr = enabledLocalIndexIterator(indexesItr);
+ if (!indexesItr.hasNext()) {
+ ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ return;
+ }
}
int nIndexes = 0;
int estimatedSize = dataTable.getRowKeySchema().getEstimatedByteSize() + 2;
@@ -172,7 +193,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
WritableUtils.writeVInt(output, nIndexes * (dataTable.getBucketNum() == null ? 1 : -1));
// Write out data row key schema once, since it's the same for all index maintainers
dataTable.getRowKeySchema().write(output);
- indexesItr = nonDisabledIndexIterator(indexes.iterator());
+ indexesItr =
+ dataTable.isImmutableRows() ? enabledLocalIndexIterator(indexes.iterator())
+ : nonDisabledIndexIterator(indexes.iterator());
while (indexesItr.hasNext()) {
indexesItr.next().getIndexMaintainer(dataTable).write(output);
}
@@ -227,6 +250,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
private int nDataCFs;
private boolean indexWALDisabled;
private boolean isLocalIndex;
+ private boolean immutableRows;
// Transient state
private final boolean isDataTableSalted;
@@ -299,6 +323,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index);
this.nDataCFs = dataTable.getColumnFamilies().size();
this.indexWALDisabled = indexWALDisabled;
+ // TODO: check whether index is immutable or not. Currently it's always false so checking
+ // data table is with immutable rows or not.
+ this.immutableRows = dataTable.isImmutableRows();
}
public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey) {
@@ -862,8 +889,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
// Encode indexWALDisabled in nDataCFs
indexWALDisabled = nDataCFs < 0;
this.nDataCFs = Math.abs(nDataCFs) - 1;
- this.estimatedIndexRowKeyBytes = WritableUtils.readVInt(input);
-
+ int encodedEstimatedIndexRowKeyBytesAndImmutableRows = WritableUtils.readVInt(input);
+ this.immutableRows = encodedEstimatedIndexRowKeyBytesAndImmutableRows < 0;
+ this.estimatedIndexRowKeyBytes = Math.abs(encodedEstimatedIndexRowKeyBytesAndImmutableRows);
initCachedState();
}
@@ -898,7 +926,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
rowKeyMetaData.write(output);
// Encode indexWALDisabled in nDataCFs
WritableUtils.writeVInt(output, (nDataCFs + 1) * (indexWALDisabled ? -1 : 1));
- WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes);
+ // Encode estimatedIndexRowKeyBytes and immutableRows together.
+ WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * (immutableRows ? -1 : 1));
}
public int getEstimatedByteSize() {
@@ -1149,7 +1178,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
return allColumns.iterator();
}
- public ValueGetter createGetterFromKeyValues(Collection<Cell> pendingUpdates) {
+ public ValueGetter createGetterFromKeyValues(Collection<? extends Cell> pendingUpdates) {
final Map<ReferencingColumn, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
.size());
for (Cell kv : pendingUpdates) {
@@ -1167,4 +1196,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
};
}
+
+ public byte[] getDataEmptyKeyValueCF() {
+ return dataEmptyKeyValueCF;
+ }
+
+ public boolean isLocalIndex() {
+ return isLocalIndex;
+ }
+
+ public boolean isImmutableRows() {
+ return immutableRows;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f28fb8b7/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 317fa7b..de5a9cc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -20,11 +20,14 @@ package org.apache.phoenix.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -49,13 +52,24 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
// table rows being indexed into the block cache, as the index maintenance code
// does a point scan per row
List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
- List<IndexMaintainer> maintainers = new ArrayList<IndexMaintainer>();
+ Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
+ new HashMap<ImmutableBytesWritable, IndexMaintainer>();
+ ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
keys.add(PDataType.VARBINARY.getKeyRange(m.getRow()));
- maintainers.addAll(getCodec().getIndexMaintainers(m.getAttributesMap()));
+ List<IndexMaintainer> indexMaintainers = getCodec().getIndexMaintainers(m.getAttributesMap());
+
+ for(IndexMaintainer indexMaintainer: indexMaintainers) {
+ if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
+ indexTableName.set(indexMaintainer.getIndexTableName());
+ if (maintainers.get(indexTableName) != null) continue;
+ maintainers.put(indexTableName, indexMaintainer);
+ }
+
}
- Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
+ if (maintainers.isEmpty()) return;
+ Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
scanRanges.setScanStartStopRow(scan);
scan.setFilter(scanRanges.getSkipScanFilter());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f28fb8b7/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index f061b8f..48a7868 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -40,12 +40,14 @@ import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ServerUtil;
@@ -109,41 +111,22 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
- List<IndexMaintainer> indexMaintainers = getIndexMaintainers(state.getUpdateAttributes());
- if (indexMaintainers.isEmpty()) {
- return Collections.emptyList();
- }
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- List<IndexUpdate> indexUpdates = Lists.newArrayList();
- // TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy
- byte[] dataRowKey = state.getCurrentRowKey();
- for (IndexMaintainer maintainer : indexMaintainers) {
- // Short-circuit building state when we know it's a row deletion
- if (maintainer.isRowDeleted(state.getPendingUpdate())) {
- continue;
- }
-
- // Get a scanner over the columns this maintainer would like to look at
- // Any updates that we would make for those columns are then added to the index update
- Pair<Scanner,IndexUpdate> statePair = state.getIndexedColumnsTableState(maintainer.getAllColumns());
- IndexUpdate indexUpdate = statePair.getSecond();
- Scanner scanner = statePair.getFirst();
-
- // get the values from the scanner so we can actually use them
- ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(scanner, dataRowKey);
- ptr.set(dataRowKey);
- Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
- indexUpdate.setTable(maintainer.getIndexTableName());
- indexUpdate.setUpdate(put);
- //make sure we close the scanner when we are done
- scanner.close();
- indexUpdates.add(indexUpdate);
- }
- return indexUpdates;
+ return getIndexUpdates(state, true);
}
@Override
public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
+ return getIndexUpdates(state, false);
+ }
+
+ /**
+ *
+ * @param state
+ * @param upsert prepare index upserts if it's true otherwise prepare index deletes.
+ * @return
+ * @throws IOException
+ */
+ private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException {
List<IndexMaintainer> indexMaintainers = getIndexMaintainers(state.getUpdateAttributes());
if (indexMaintainers.isEmpty()) {
return Collections.emptyList();
@@ -152,19 +135,51 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
// TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy
byte[] dataRowKey = state.getCurrentRowKey();
+ ptr.set(dataRowKey);
+ byte[] localIndexTableName = MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName());
+ ValueGetter valueGetter = null;
+ Scanner scanner = null;
for (IndexMaintainer maintainer : indexMaintainers) {
- // TODO: if more efficient, I could do this just once with all columns in all indexes
- Pair<Scanner,IndexUpdate> statePair = state.getIndexedColumnsTableState(maintainer.getAllColumns());
- Scanner scanner = statePair.getFirst();
- IndexUpdate indexUpdate = statePair.getSecond();
- indexUpdate.setTable(maintainer.getIndexTableName());
- ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(scanner, dataRowKey);
- ptr.set(dataRowKey);
- Delete delete =
- maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr,
- state.getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
- scanner.close();
- indexUpdate.setUpdate(delete);
+ if(upsert) {
+ // Short-circuit building state when we know it's a row deletion
+ if (maintainer.isRowDeleted(state.getPendingUpdate())) {
+ continue;
+ }
+ }
+ IndexUpdate indexUpdate = null;
+ if (maintainer.isImmutableRows()) {
+ indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns()));
+ if(maintainer.isLocalIndex()) {
+ indexUpdate.setTable(localIndexTableName);
+ } else {
+ indexUpdate.setTable(maintainer.getIndexTableName());
+ }
+ valueGetter = maintainer.createGetterFromKeyValues(state.getPendingUpdate());
+ } else {
+ // TODO: if more efficient, I could do this just once with all columns in all indexes
+ Pair<Scanner,IndexUpdate> statePair = state.getIndexedColumnsTableState(maintainer.getAllColumns());
+ scanner = statePair.getFirst();
+ indexUpdate = statePair.getSecond();
+ indexUpdate.setTable(maintainer.getIndexTableName());
+ valueGetter = IndexManagementUtil.createGetterFromScanner(scanner, dataRowKey);
+ }
+ Mutation mutation = null;
+ if (upsert) {
+ mutation =
+ maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state
+ .getCurrentTimestamp(), env.getRegion().getStartKey(), env
+ .getRegion().getEndKey());
+ } else {
+ mutation =
+ maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state
+ .getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion()
+ .getStartKey(), env.getRegion().getEndKey());
+ }
+ indexUpdate.setUpdate(mutation);
+ if (scanner != null) {
+ scanner.close();
+ scanner = null;
+ }
indexUpdates.add(indexUpdate);
}
return indexUpdates;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f28fb8b7/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 3b799bf..7f824e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -758,9 +758,6 @@ public class MetaDataClient {
* 2) for a view on an index.
*/
if (statement.getIndexType() == IndexType.LOCAL || (dataTable.getType() == PTableType.VIEW && dataTable.getViewType() != ViewType.MAPPED)) {
- if (dataTable.isImmutableRows() && statement.getIndexType() == IndexType.LOCAL) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_LOCAL_INDEX_ON_TABLE_WITH_IMMUTABLE_ROWS).setTableName(indexTableName.getTableName()).build().buildException();
- }
allocateIndexId = true;
// Next add index ID column
PDataType dataType = MetaDataUtil.getViewIndexIdDataType();