You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/26 05:36:41 UTC

[1/2] phoenix git commit: PHOENIX-4230 Write index updates in postBatchMutateIndispensably for transactional tables

Repository: phoenix
Updated Branches:
  refs/heads/master 944bed735 -> 94601de5f


PHOENIX-4230 Write index updates in postBatchMutateIndispensably for transactional tables


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d13a2e5b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d13a2e5b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d13a2e5b

Branch: refs/heads/master
Commit: d13a2e5b27db8d22344442a9fc9890a37052f0f9
Parents: 944bed7
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Sep 25 18:52:48 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Sep 25 18:52:48 2017 -0700

----------------------------------------------------------------------
 .../index/PhoenixTransactionalIndexer.java      | 79 +++++++++++++++++---
 1 file changed, 70 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d13a2e5b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 5444360..969378d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -17,6 +17,11 @@
  */
 package org.apache.phoenix.index;
 
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -36,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
@@ -99,6 +105,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
 
     private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class);
 
+    // Hack to get around not being able to save any state between
+    // coprocessor calls. TODO: remove after HBASE-18127 when available
+    private static class BatchMutateContext {
+        public Collection<Pair<Mutation, byte[]>> indexUpdates = Collections.emptyList();
+    }
+    
+    private ThreadLocal<BatchMutateContext> batchMutateContext =
+            new ThreadLocal<BatchMutateContext>();
+    
     private PhoenixIndexCodec codec;
     private IndexWriter writer;
     private boolean stopped;
@@ -117,9 +132,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
          */
         clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
                 InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
+        // lower the number of rpc retries.  We inherit config from HConnectionManager#setServerSideHConnectionRetries,
+        // which by default uses a multiplier of 10.  That is too many retries for our synchronous index writes
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
+                DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration()
+            .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
         DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
         // setup the actual index writer
-        // setup the actual index writer
         // For transactional tables, we keep the index active upon a write failure
         // since we have the all versus none behavior for transactions.
         this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer");
@@ -154,6 +175,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             
         };
     }
+    
     @Override
     public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
             MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
@@ -164,6 +186,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             return;
         }
 
+        BatchMutateContext context = new BatchMutateContext();
+        setBatchMutateContext(c, context);
+        
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
         PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
         byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
@@ -176,15 +201,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             }
 
             // get the index updates for all elements in this batch
-            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
+            context.indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
 
             current.addTimelineAnnotation("Built index updates, doing preStep");
-            TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
-
-            // no index updates, so we are done
-            if (!indexUpdates.isEmpty()) {
-                this.writer.write(indexUpdates, true);
-            }
+            TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size());
         } catch (Throwable t) {
             String msg = "Failed to update index with entries:" + indexUpdates;
             LOG.error(msg, t);
@@ -192,7 +212,48 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         }
     }
 
-    public static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
+    @Override
+    public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
+        MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
+        BatchMutateContext context = getBatchMutateContext(c);
+        if (context == null || context.indexUpdates == null) {
+            return;
+        }
+        // get the current span, or just use a null-span to avoid a bunch of if statements
+        try (TraceScope scope = Trace.startSpan("Starting to write index updates")) {
+            Span current = scope.getSpan();
+            if (current == null) {
+                current = NullSpan.INSTANCE;
+            }
+
+            if (success) { // if miniBatchOp was successfully written, write index updates
+                if (!context.indexUpdates.isEmpty()) {
+                    this.writer.write(context.indexUpdates, true);
+                }
+                current.addTimelineAnnotation("Wrote index updates");
+            }
+        } catch (Throwable t) {
+            String msg = "Failed to write index updates:" + context.indexUpdates;
+            LOG.error(msg, t);
+            ServerUtil.throwIOException(msg, t);
+         } finally {
+             removeBatchMutateContext(c);
+         }
+    }
+
+    private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
+        this.batchMutateContext.set(context);
+    }
+    
+    private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+        return this.batchMutateContext.get();
+    }
+    
+    private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+        this.batchMutateContext.remove();
+    }
+
+    private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
         MultiMutation stored = mutations.get(row);
         // we haven't seen this row before, so add it
         if (stored == null) {


[2/2] phoenix git commit: PHOENIX-4233 IndexScrutiny test tool does not work for salted and shared index tables

Posted by ja...@apache.org.
PHOENIX-4233 IndexScrutiny test tool does not work for salted and shared index tables


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/94601de5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/94601de5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/94601de5

Branch: refs/heads/master
Commit: 94601de5f5f966fb8bcd1a069409bee460bf2400
Parents: d13a2e5
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Sep 25 22:33:28 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Sep 25 22:33:28 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/util/IndexScrutinyIT.java    |  4 +-
 .../org/apache/phoenix/util/IndexScrutiny.java  | 47 ++++++++++++++------
 2 files changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/94601de5/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index a5ec83f..3277e32 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -35,7 +35,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
         String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
         String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
         try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true, SALT_BUCKETS=2");
             conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
@@ -61,7 +61,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
         String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
-            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)");
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')");
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')");
             conn.commit();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/94601de5/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
index c78658d..380e718 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
@@ -25,6 +25,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.List;
 
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PColumn;
@@ -39,20 +40,39 @@ public class IndexScrutiny {
     public static long scrutinizeIndex(Connection conn, String fullTableName, String fullIndexName) throws SQLException {
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         PTable ptable = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
+        int tableColumnOffset = 0;
+        List<PColumn> tableColumns = ptable.getColumns();
+        List<PColumn> tablePKColumns = ptable.getPKColumns();
+        if (ptable.getBucketNum() != null) {
+            tableColumnOffset = 1;
+            tableColumns = tableColumns.subList(tableColumnOffset, tableColumns.size());
+            tablePKColumns = tablePKColumns.subList(tableColumnOffset, tablePKColumns.size());
+        }
         PTable pindex = pconn.getTable(new PTableKey(pconn.getTenantId(), fullIndexName));
+        List<PColumn> indexColumns = pindex.getColumns();
+        int indexColumnOffset = 0;
+        if (pindex.getBucketNum() != null) {
+            indexColumnOffset = 1;
+        }
+        if (pindex.getViewIndexId() != null) {
+            indexColumnOffset++;
+        }
+        if (indexColumnOffset > 0) {
+            indexColumns = indexColumns.subList(indexColumnOffset, indexColumns.size());
+        }
         StringBuilder indexQueryBuf = new StringBuilder("SELECT ");
-        for (PColumn dcol : ptable.getPKColumns()) {
+        for (PColumn dcol : tablePKColumns) {
             indexQueryBuf.append("CAST(\"" + IndexUtil.getIndexColumnName(dcol) + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
             indexQueryBuf.append(",");
         }
-        for (PColumn icol : pindex.getColumns()) {
+        for (PColumn icol :indexColumns) {
             PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
             if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) {
                 indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
                 indexQueryBuf.append(",");
             }
         }
-        for (PColumn icol : pindex.getColumns()) {
+        for (PColumn icol : indexColumns) {
             if (!SchemaUtil.isPKColumn(icol)) {
                 PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
                 indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
@@ -63,11 +83,11 @@ public class IndexScrutiny {
         indexQueryBuf.append("\nFROM " + fullIndexName);
         
         StringBuilder tableQueryBuf = new StringBuilder("SELECT ");
-        for (PColumn dcol : ptable.getPKColumns()) {
+        for (PColumn dcol : tablePKColumns) {
             tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
             tableQueryBuf.append(",");
         }
-        for (PColumn icol : pindex.getColumns()) {
+        for (PColumn icol : indexColumns) {
             PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
             if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) {
                 if (dcol.getFamilyName() != null) {
@@ -78,7 +98,7 @@ public class IndexScrutiny {
                 tableQueryBuf.append(",");
             }
         }
-        for (PColumn icol : pindex.getColumns()) {
+        for (PColumn icol : indexColumns) {
             if (!SchemaUtil.isPKColumn(icol)) {
                 PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
                 if (dcol.getFamilyName() != null) {
@@ -91,13 +111,13 @@ public class IndexScrutiny {
         }
         tableQueryBuf.setLength(tableQueryBuf.length()-1);
         tableQueryBuf.append("\nFROM " + fullTableName + "\nWHERE (");
-        for (PColumn dcol : ptable.getPKColumns()) {
+        for (PColumn dcol : tablePKColumns) {
             tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
             tableQueryBuf.append(",");
         }
         tableQueryBuf.setLength(tableQueryBuf.length()-1);
         tableQueryBuf.append(") = ((");
-        for (int i = 0; i < ptable.getPKColumns().size(); i++) {
+        for (int i = 0; i < tablePKColumns.size(); i++) {
             tableQueryBuf.append("?");
             tableQueryBuf.append(",");
         }
@@ -114,11 +134,12 @@ public class IndexScrutiny {
         while (irs.next()) {
             icount++;
             StringBuilder pkBuf = new StringBuilder("(");
-            for (int i = 0; i < ptable.getPKColumns().size(); i++) {
-                PColumn dcol = ptable.getPKColumns().get(i);
-                Object pkVal = irs.getObject(i+1);
-                PDataType pkType = PDataType.fromTypeId(irsmd.getColumnType(i + 1));
-                istmt.setObject(i+1, pkVal, dcol.getDataType().getSqlType());
+            for (int i = 0; i < tablePKColumns.size(); i++) {
+                PColumn dcol = tablePKColumns.get(i);
+                int offset = i+1;
+                Object pkVal = irs.getObject(offset);
+                PDataType pkType = PDataType.fromTypeId(irsmd.getColumnType(offset));
+                istmt.setObject(offset, pkVal, dcol.getDataType().getSqlType());
                 pkBuf.append(pkType.toStringLiteral(pkVal));
                 pkBuf.append(",");
             }