You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/26 05:36:41 UTC
[1/2] phoenix git commit: PHOENIX-4230 Write index updates in
postBatchMutateIndispensably for transactional tables
Repository: phoenix
Updated Branches:
refs/heads/master 944bed735 -> 94601de5f
PHOENIX-4230 Write index updates in postBatchMutateIndispensably for transactional tables
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d13a2e5b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d13a2e5b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d13a2e5b
Branch: refs/heads/master
Commit: d13a2e5b27db8d22344442a9fc9890a37052f0f9
Parents: 944bed7
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Sep 25 18:52:48 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Sep 25 18:52:48 2017 -0700
----------------------------------------------------------------------
.../index/PhoenixTransactionalIndexer.java | 79 +++++++++++++++++---
1 file changed, 70 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d13a2e5b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 5444360..969378d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -17,6 +17,11 @@
*/
package org.apache.phoenix.index;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -36,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
@@ -99,6 +105,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class);
+ // Hack to get around not being able to save any state between
+ // coprocessor calls. TODO: remove after HBASE-18127 when available
+ private static class BatchMutateContext {
+ public Collection<Pair<Mutation, byte[]>> indexUpdates = Collections.emptyList();
+ }
+
+ private ThreadLocal<BatchMutateContext> batchMutateContext =
+ new ThreadLocal<BatchMutateContext>();
+
private PhoenixIndexCodec codec;
private IndexWriter writer;
private boolean stopped;
@@ -117,9 +132,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
*/
clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
+ // lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries,
+ // which by default uses a multiplier of 10. That is too many retries for our synchronous index writes
+ clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
+ DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
+ clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration()
+ .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
// setup the actual index writer
- // setup the actual index writer
// For transactional tables, we keep the index active upon a write failure
// since we have the all versus none behavior for transactions.
this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer");
@@ -154,6 +175,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
};
}
+
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
@@ -164,6 +186,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
return;
}
+ BatchMutateContext context = new BatchMutateContext();
+ setBatchMutateContext(c, context);
+
Map<String,byte[]> updateAttributes = m.getAttributesMap();
PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
@@ -176,15 +201,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
// get the index updates for all elements in this batch
- indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
+ context.indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
current.addTimelineAnnotation("Built index updates, doing preStep");
- TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
-
- // no index updates, so we are done
- if (!indexUpdates.isEmpty()) {
- this.writer.write(indexUpdates, true);
- }
+ TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size());
} catch (Throwable t) {
String msg = "Failed to update index with entries:" + indexUpdates;
LOG.error(msg, t);
@@ -192,7 +212,48 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
}
- public static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
+ @Override
+ public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
+ BatchMutateContext context = getBatchMutateContext(c);
+ if (context == null || context.indexUpdates == null) {
+ return;
+ }
+ // get the current span, or just use a null-span to avoid a bunch of if statements
+ try (TraceScope scope = Trace.startSpan("Starting to write index updates")) {
+ Span current = scope.getSpan();
+ if (current == null) {
+ current = NullSpan.INSTANCE;
+ }
+
+ if (success) { // if miniBatchOp was successfully written, write index updates
+ if (!context.indexUpdates.isEmpty()) {
+ this.writer.write(context.indexUpdates, true);
+ }
+ current.addTimelineAnnotation("Wrote index updates");
+ }
+ } catch (Throwable t) {
+ String msg = "Failed to write index updates:" + context.indexUpdates;
+ LOG.error(msg, t);
+ ServerUtil.throwIOException(msg, t);
+ } finally {
+ removeBatchMutateContext(c);
+ }
+ }
+
+ private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
+ this.batchMutateContext.set(context);
+ }
+
+ private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+ return this.batchMutateContext.get();
+ }
+
+ private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+ this.batchMutateContext.remove();
+ }
+
+ private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
MultiMutation stored = mutations.get(row);
// we haven't seen this row before, so add it
if (stored == null) {
[2/2] phoenix git commit: PHOENIX-4233 IndexScrutiny test tool does
not work for salted and shared index tables
Posted by ja...@apache.org.
PHOENIX-4233 IndexScrutiny test tool does not work for salted and shared index tables
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/94601de5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/94601de5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/94601de5
Branch: refs/heads/master
Commit: 94601de5f5f966fb8bcd1a069409bee460bf2400
Parents: d13a2e5
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Sep 25 22:33:28 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Sep 25 22:33:28 2017 -0700
----------------------------------------------------------------------
.../apache/phoenix/util/IndexScrutinyIT.java | 4 +-
.../org/apache/phoenix/util/IndexScrutiny.java | 47 ++++++++++++++------
2 files changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/94601de5/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index a5ec83f..3277e32 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -35,7 +35,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
try (Connection conn = DriverManager.getConnection(getUrl())) {
- conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true, SALT_BUCKETS=2");
conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
@@ -61,7 +61,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')");
conn.commit();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/94601de5/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
index c78658d..380e718 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
@@ -25,6 +25,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.util.List;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PColumn;
@@ -39,20 +40,39 @@ public class IndexScrutiny {
public static long scrutinizeIndex(Connection conn, String fullTableName, String fullIndexName) throws SQLException {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
PTable ptable = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
+ int tableColumnOffset = 0;
+ List<PColumn> tableColumns = ptable.getColumns();
+ List<PColumn> tablePKColumns = ptable.getPKColumns();
+ if (ptable.getBucketNum() != null) {
+ tableColumnOffset = 1;
+ tableColumns = tableColumns.subList(tableColumnOffset, tableColumns.size());
+ tablePKColumns = tablePKColumns.subList(tableColumnOffset, tablePKColumns.size());
+ }
PTable pindex = pconn.getTable(new PTableKey(pconn.getTenantId(), fullIndexName));
+ List<PColumn> indexColumns = pindex.getColumns();
+ int indexColumnOffset = 0;
+ if (pindex.getBucketNum() != null) {
+ indexColumnOffset = 1;
+ }
+ if (pindex.getViewIndexId() != null) {
+ indexColumnOffset++;
+ }
+ if (indexColumnOffset > 0) {
+ indexColumns = indexColumns.subList(indexColumnOffset, indexColumns.size());
+ }
StringBuilder indexQueryBuf = new StringBuilder("SELECT ");
- for (PColumn dcol : ptable.getPKColumns()) {
+ for (PColumn dcol : tablePKColumns) {
indexQueryBuf.append("CAST(\"" + IndexUtil.getIndexColumnName(dcol) + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
indexQueryBuf.append(",");
}
- for (PColumn icol : pindex.getColumns()) {
+ for (PColumn icol :indexColumns) {
PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) {
indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
indexQueryBuf.append(",");
}
}
- for (PColumn icol : pindex.getColumns()) {
+ for (PColumn icol : indexColumns) {
if (!SchemaUtil.isPKColumn(icol)) {
PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
@@ -63,11 +83,11 @@ public class IndexScrutiny {
indexQueryBuf.append("\nFROM " + fullIndexName);
StringBuilder tableQueryBuf = new StringBuilder("SELECT ");
- for (PColumn dcol : ptable.getPKColumns()) {
+ for (PColumn dcol : tablePKColumns) {
tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
tableQueryBuf.append(",");
}
- for (PColumn icol : pindex.getColumns()) {
+ for (PColumn icol : indexColumns) {
PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) {
if (dcol.getFamilyName() != null) {
@@ -78,7 +98,7 @@ public class IndexScrutiny {
tableQueryBuf.append(",");
}
}
- for (PColumn icol : pindex.getColumns()) {
+ for (PColumn icol : indexColumns) {
if (!SchemaUtil.isPKColumn(icol)) {
PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
if (dcol.getFamilyName() != null) {
@@ -91,13 +111,13 @@ public class IndexScrutiny {
}
tableQueryBuf.setLength(tableQueryBuf.length()-1);
tableQueryBuf.append("\nFROM " + fullTableName + "\nWHERE (");
- for (PColumn dcol : ptable.getPKColumns()) {
+ for (PColumn dcol : tablePKColumns) {
tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
tableQueryBuf.append(",");
}
tableQueryBuf.setLength(tableQueryBuf.length()-1);
tableQueryBuf.append(") = ((");
- for (int i = 0; i < ptable.getPKColumns().size(); i++) {
+ for (int i = 0; i < tablePKColumns.size(); i++) {
tableQueryBuf.append("?");
tableQueryBuf.append(",");
}
@@ -114,11 +134,12 @@ public class IndexScrutiny {
while (irs.next()) {
icount++;
StringBuilder pkBuf = new StringBuilder("(");
- for (int i = 0; i < ptable.getPKColumns().size(); i++) {
- PColumn dcol = ptable.getPKColumns().get(i);
- Object pkVal = irs.getObject(i+1);
- PDataType pkType = PDataType.fromTypeId(irsmd.getColumnType(i + 1));
- istmt.setObject(i+1, pkVal, dcol.getDataType().getSqlType());
+ for (int i = 0; i < tablePKColumns.size(); i++) {
+ PColumn dcol = tablePKColumns.get(i);
+ int offset = i+1;
+ Object pkVal = irs.getObject(offset);
+ PDataType pkType = PDataType.fromTypeId(irsmd.getColumnType(offset));
+ istmt.setObject(offset, pkVal, dcol.getDataType().getSqlType());
pkBuf.append(pkType.toStringLiteral(pkVal));
pkBuf.append(",");
}