You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/06/21 19:27:42 UTC

phoenix git commit: PHOENIX-2209 Building Local Index Asynchronously via IndexTool fails to populate index table(Rajeshbabu)

Repository: phoenix
Updated Branches:
  refs/heads/master e6f0b62de -> 9e03a48fb


PHOENIX-2209 Building Local Index Asynchronously via IndexTool fails to populate index table(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9e03a48f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9e03a48f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9e03a48f

Branch: refs/heads/master
Commit: 9e03a48fb3c76f4a53c11fc6ede21ad573f80157
Parents: e6f0b62
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Wed Jun 22 01:04:07 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Wed Jun 22 01:04:07 2016 +0530

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/IndexToolIT.java |  5 +-
 .../apache/phoenix/compile/UpsertCompiler.java  | 48 ++++++++++++++++++--
 .../java/org/apache/phoenix/util/IndexUtil.java | 21 +++++++++
 3 files changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9e03a48f/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 9fb9e0a..cb013c8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -80,6 +80,7 @@ public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT {
                 optionBuilder.append(",");
             optionBuilder.append(" TRANSACTIONAL=true ");
         }
+        optionBuilder.append(" SPLIT ON(1,2)");
         this.tableDDLOptions = optionBuilder.toString();
     }
     
@@ -143,7 +144,7 @@ public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT {
             String actualExplainPlan = QueryUtil.getExplainPlan(rs);
             
             //assert we are pulling from data table.
-            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER %s", fullTableName), actualExplainPlan);
+            assertEquals(String.format("CLIENT 3-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER %s", fullTableName), actualExplainPlan);
             
             rs = stmt1.executeQuery(selectSql);
             assertTrue(rs.next());
@@ -204,7 +205,7 @@ public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT {
         String expectedExplainPlan = "";
         if(isLocal) {
             final String localIndexName = SchemaUtil.getTableName(schemaName, dataTable);
-            expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN RANGE SCAN OVER %s [1]"
+            expectedExplainPlan = String.format("CLIENT 3-CHUNK PARALLEL 3-WAY ROUND ROBIN RANGE SCAN OVER %s [1]"
                 + "\n    SERVER FILTER BY FIRST KEY ONLY", localIndexName);
         } else {
             expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER %s"

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9e03a48f/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index be6499b..26855aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -113,7 +113,10 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class UpsertCompiler {
-    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement, boolean useServerTimestamp) throws SQLException {
+    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
+            PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
+            PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
+            byte[][] viewConstants) throws SQLException {
         Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -144,6 +147,19 @@ public class UpsertCompiler {
         }
         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
         table.newKey(ptr, pkValues);
+        if (table.getIndexType() == IndexType.LOCAL && maintainer != null) {
+            byte[] rowKey = maintainer.buildDataRowKey(ptr, viewConstants);
+            HRegionLocation region =
+                    statement.getConnection().getQueryServices()
+                            .getTableRegionLocation(table.getParentName().getBytes(), rowKey);
+            byte[] regionPrefix =
+                    region.getRegionInfo().getStartKey().length == 0 ? new byte[region
+                            .getRegionInfo().getEndKey().length] : region.getRegionInfo()
+                            .getStartKey();
+            if (regionPrefix.length != 0) {
+                ptr.set(ScanRanges.prefixKey(ptr.get(), 0, regionPrefix, regionPrefix.length));
+            }
+        } 
         mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo));
     }
     
@@ -160,6 +176,19 @@ public class UpsertCompiler {
         int rowCount = 0;
         Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
         PTable table = tableRef.getTable();
+        IndexMaintainer indexMaintainer = null;
+        byte[][] viewConstants = null;
+        if (table.getIndexType() == IndexType.LOCAL) {
+            PTable parentTable =
+                    statement
+                            .getConnection()
+                            .getMetaDataCache()
+                            .getTableRef(
+                                new PTableKey(statement.getConnection().getTenantId(), table
+                                        .getParentName().getString())).getTable();
+            indexMaintainer = table.getIndexMaintainer(parentTable, connection);
+            viewConstants = IndexUtil.getViewConstants(parentTable);
+        }
         try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             while (rs.next()) {
@@ -185,7 +214,7 @@ public class UpsertCompiler {
                             table.rowKeyOrderOptimizable());
                     values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp);
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants);
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
                 if (isAutoCommit && rowCount % batchSize == 0) {
@@ -927,7 +956,20 @@ public class UpsertCompiler {
                     }
                 }
                 Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp);
+                IndexMaintainer indexMaintainer = null;
+                byte[][] viewConstants = null;
+                if (table.getIndexType() == IndexType.LOCAL) {
+                    PTable parentTable =
+                            statement
+                                    .getConnection()
+                                    .getMetaDataCache()
+                                    .getTableRef(
+                                        new PTableKey(statement.getConnection().getTenantId(),
+                                                table.getParentName().getString())).getTable();
+                    indexMaintainer = table.getIndexMaintainer(parentTable, connection);
+                    viewConstants = IndexUtil.getViewConstants(parentTable);
+                }
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants);
                 return new MutationState(tableRef, mutation, 0, maxSize, connection);
             }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9e03a48f/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 5532d71..86fa8ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -77,6 +77,7 @@ import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.ResultTuple;
@@ -638,4 +639,24 @@ public class IndexUtil {
         return col.getExpressionStr() == null ? IndexUtil.getCaseSensitiveDataColumnFullName(col.getName().getString())
                 : col.getExpressionStr();
     }
+
+    public static byte[][] getViewConstants(PTable dataTable) {
+        if (dataTable.getType() != PTableType.VIEW) return null;
+        int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0);
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        List<byte[]> viewConstants = new ArrayList<byte[]>();
+        List<PColumn> dataPkColumns = dataTable.getPKColumns();
+        for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
+            PColumn dataPKColumn = dataPkColumns.get(i);
+            if (dataPKColumn.getViewConstant() != null) {
+                if (IndexUtil.getViewConstantValue(dataPKColumn, ptr)) {
+                    viewConstants.add(ByteUtil.copyKeyBytesIfNecessary(ptr));
+                } else {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+        return viewConstants.isEmpty() ? null : viewConstants
+                .toArray(new byte[viewConstants.size()][]);
+    }
 }