You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/06/21 19:27:42 UTC
phoenix git commit: PHOENIX-2209 Building Local Index Asynchronously
via IndexTool fails to populate index table(Rajeshbabu)
Repository: phoenix
Updated Branches:
refs/heads/master e6f0b62de -> 9e03a48fb
PHOENIX-2209 Building Local Index Asynchronously via IndexTool fails to populate index table(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9e03a48f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9e03a48f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9e03a48f
Branch: refs/heads/master
Commit: 9e03a48fb3c76f4a53c11fc6ede21ad573f80157
Parents: e6f0b62
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Wed Jun 22 01:04:07 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Wed Jun 22 01:04:07 2016 +0530
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/IndexToolIT.java | 5 +-
.../apache/phoenix/compile/UpsertCompiler.java | 48 ++++++++++++++++++--
.../java/org/apache/phoenix/util/IndexUtil.java | 21 +++++++++
3 files changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9e03a48f/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 9fb9e0a..cb013c8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -80,6 +80,7 @@ public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT {
optionBuilder.append(",");
optionBuilder.append(" TRANSACTIONAL=true ");
}
+ optionBuilder.append(" SPLIT ON(1,2)");
this.tableDDLOptions = optionBuilder.toString();
}
@@ -143,7 +144,7 @@ public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT {
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
//assert we are pulling from data table.
- assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER %s", fullTableName), actualExplainPlan);
+ assertEquals(String.format("CLIENT 3-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER %s", fullTableName), actualExplainPlan);
rs = stmt1.executeQuery(selectSql);
assertTrue(rs.next());
@@ -204,7 +205,7 @@ public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT {
String expectedExplainPlan = "";
if(isLocal) {
final String localIndexName = SchemaUtil.getTableName(schemaName, dataTable);
- expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN RANGE SCAN OVER %s [1]"
+ expectedExplainPlan = String.format("CLIENT 3-CHUNK PARALLEL 3-WAY ROUND ROBIN RANGE SCAN OVER %s [1]"
+ "\n SERVER FILTER BY FIRST KEY ONLY", localIndexName);
} else {
expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER %s"
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9e03a48f/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index be6499b..26855aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -113,7 +113,10 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class UpsertCompiler {
- private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement, boolean useServerTimestamp) throws SQLException {
+ private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
+ PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
+ PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
+ byte[][] viewConstants) throws SQLException {
Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
byte[][] pkValues = new byte[table.getPKColumns().size()][];
// If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -144,6 +147,19 @@ public class UpsertCompiler {
}
ImmutableBytesPtr ptr = new ImmutableBytesPtr();
table.newKey(ptr, pkValues);
+ if (table.getIndexType() == IndexType.LOCAL && maintainer != null) {
+ byte[] rowKey = maintainer.buildDataRowKey(ptr, viewConstants);
+ HRegionLocation region =
+ statement.getConnection().getQueryServices()
+ .getTableRegionLocation(table.getParentName().getBytes(), rowKey);
+ byte[] regionPrefix =
+ region.getRegionInfo().getStartKey().length == 0 ? new byte[region
+ .getRegionInfo().getEndKey().length] : region.getRegionInfo()
+ .getStartKey();
+ if (regionPrefix.length != 0) {
+ ptr.set(ScanRanges.prefixKey(ptr.get(), 0, regionPrefix, regionPrefix.length));
+ }
+ }
mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo));
}
@@ -160,6 +176,19 @@ public class UpsertCompiler {
int rowCount = 0;
Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
PTable table = tableRef.getTable();
+ IndexMaintainer indexMaintainer = null;
+ byte[][] viewConstants = null;
+ if (table.getIndexType() == IndexType.LOCAL) {
+ PTable parentTable =
+ statement
+ .getConnection()
+ .getMetaDataCache()
+ .getTableRef(
+ new PTableKey(statement.getConnection().getTenantId(), table
+ .getParentName().getString())).getTable();
+ indexMaintainer = table.getIndexMaintainer(parentTable, connection);
+ viewConstants = IndexUtil.getViewConstants(parentTable);
+ }
try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
while (rs.next()) {
@@ -185,7 +214,7 @@ public class UpsertCompiler {
table.rowKeyOrderOptimizable());
values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
}
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp);
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants);
rowCount++;
// Commit a batch if auto commit is true and we're at our batch size
if (isAutoCommit && rowCount % batchSize == 0) {
@@ -927,7 +956,20 @@ public class UpsertCompiler {
}
}
Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp);
+ IndexMaintainer indexMaintainer = null;
+ byte[][] viewConstants = null;
+ if (table.getIndexType() == IndexType.LOCAL) {
+ PTable parentTable =
+ statement
+ .getConnection()
+ .getMetaDataCache()
+ .getTableRef(
+ new PTableKey(statement.getConnection().getTenantId(),
+ table.getParentName().getString())).getTable();
+ indexMaintainer = table.getIndexMaintainer(parentTable, connection);
+ viewConstants = IndexUtil.getViewConstants(parentTable);
+ }
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants);
return new MutationState(tableRef, mutation, 0, maxSize, connection);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9e03a48f/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 5532d71..86fa8ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -77,6 +77,7 @@ import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.ResultTuple;
@@ -638,4 +639,24 @@ public class IndexUtil {
return col.getExpressionStr() == null ? IndexUtil.getCaseSensitiveDataColumnFullName(col.getName().getString())
: col.getExpressionStr();
}
+
+ public static byte[][] getViewConstants(PTable dataTable) {
+ if (dataTable.getType() != PTableType.VIEW) return null;
+ int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0);
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ List<byte[]> viewConstants = new ArrayList<byte[]>();
+ List<PColumn> dataPkColumns = dataTable.getPKColumns();
+ for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
+ PColumn dataPKColumn = dataPkColumns.get(i);
+ if (dataPKColumn.getViewConstant() != null) {
+ if (IndexUtil.getViewConstantValue(dataPKColumn, ptr)) {
+ viewConstants.add(ByteUtil.copyKeyBytesIfNecessary(ptr));
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ return viewConstants.isEmpty() ? null : viewConstants
+ .toArray(new byte[viewConstants.size()][]);
+ }
}