You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2017/04/20 00:41:29 UTC
phoenix git commit: Apply Local Indexes batch updates only once.
Repository: phoenix
Updated Branches:
refs/heads/master ee886bab9 -> 5bd7f79b5
Apply Local Indexes batch updates only once.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5bd7f79b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5bd7f79b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5bd7f79b
Branch: refs/heads/master
Commit: 5bd7f79b51309505a19f854d05cb000f5cd1eb9f
Parents: ee886ba
Author: Lars Hofhansl <la...@apache.org>
Authored: Wed Apr 19 17:41:00 2017 -0700
Committer: Lars Hofhansl <la...@apache.org>
Committed: Wed Apr 19 17:41:00 2017 -0700
----------------------------------------------------------------------
.../phoenix/end2end/index/LocalIndexIT.java | 30 ++++++++
.../org/apache/phoenix/hbase/index/Indexer.java | 73 +++++++-------------
2 files changed, 56 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5bd7f79b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index a7d0028..8d3316b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -617,6 +617,36 @@ public class LocalIndexIT extends BaseLocalIndexIT {
}
}
+ @Test
+ public void testLocalGlobalIndexMix() throws Exception {
+ if (isNamespaceMapped) { return; }
+ String tableName = generateUniqueName();
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" +
+ "k1 INTEGER NOT NULL,\n" +
+ "k2 INTEGER NOT NULL,\n" +
+ "k3 INTEGER,\n" +
+ "v1 VARCHAR,\n" +
+ "v2 VARCHAR,\n" +
+ "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
+ conn1.createStatement().execute(ddl);
+ conn1.createStatement().execute("CREATE LOCAL INDEX LV1 ON " + tableName + "(v1)");
+ conn1.createStatement().execute("CREATE INDEX GV2 ON " + tableName + "(v2)");
+
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z','3')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'a','0')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a','2')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c','1')");
+ conn1.commit();
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE v1 = 'c'");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE v2 = '2'");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ conn1.close();
+ }
+
private void copyLocalIndexHFiles(Configuration conf, HRegionInfo fromRegion, HRegionInfo toRegion, boolean move)
throws IOException {
Path root = FSUtils.getRootDir(conf);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5bd7f79b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index de98051..9fc76e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -372,7 +372,7 @@ public class Indexer extends BaseRegionObserver {
super.postPut(e, put, edit, durability);
return;
}
- doPost(edit, put, durability, true, false);
+ doPost(edit, put, durability);
}
@Override
@@ -382,29 +382,10 @@ public class Indexer extends BaseRegionObserver {
super.postDelete(e, delete, edit, durability);
return;
}
- doPost(edit, delete, durability, true, false);
+ doPost(edit, delete, durability);
}
@Override
- public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- if (this.disabled) {
- super.postBatchMutate(c, miniBatchOp);
- return;
- }
- WALEdit edit = miniBatchOp.getWalEdit(0);
- if (edit != null) {
- IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);
- if (ikv != null) {
- // This will prevent the postPut and postDelete hooks from doing anything
- // We need to do this now, as the postBatchMutateIndispensably (where the
- // actual index writing gets done) is called after the postPut and postDelete.
- ikv.markBatchFinished();
- }
- }
- }
-
- @Override
public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
if (this.disabled) {
@@ -417,17 +398,13 @@ public class Indexer extends BaseRegionObserver {
//each batch operation, only the first one will have anything useful, so we can just grab that
Mutation mutation = miniBatchOp.getOperation(0);
WALEdit edit = miniBatchOp.getWalEdit(0);
- // We're forcing the index writes here because we've marked the index batch as "finished"
- // to prevent postPut and postDelete from doing anything, but hold off on writing them
- // until now so we're outside of the MVCC lock (see PHOENIX-3789). Without this hacky
- // forceWrite flag, we'd ignore them again here too.
- doPost(edit, mutation, mutation.getDurability(), false, true);
+ doPost(edit, mutation, mutation.getDurability());
}
}
- private void doPost(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates, boolean forceWrite) throws IOException {
+ private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException {
try {
- doPostWithExceptions(edit, m, durability, allowLocalUpdates, forceWrite);
+ doPostWithExceptions(edit, m, durability);
return;
} catch (Throwable e) {
rethrowIndexingException(e);
@@ -436,7 +413,7 @@ public class Indexer extends BaseRegionObserver {
"Somehow didn't complete the index update, but didn't return succesfully either!");
}
- private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates, boolean forceWrite)
+ private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability)
throws Exception {
//short circuit, if we don't need to do any work
if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null) {
@@ -470,30 +447,32 @@ public class Indexer extends BaseRegionObserver {
* once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can
* lead to writing all the index updates for each Put/Delete).
*/
- if ((!ikv.getBatchFinished() || forceWrite) || allowLocalUpdates) {
+ if (!ikv.getBatchFinished()) {
Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit);
// the WAL edit is kept in memory and we already specified the factory when we created the
// references originally - therefore, we just pass in a null factory here and use the ones
// already specified on each reference
try {
- if (!ikv.getBatchFinished() || forceWrite) {
- current.addTimelineAnnotation("Actually doing index update for first time");
- writer.writeAndKillYourselfOnFailure(indexUpdates, allowLocalUpdates);
- } else if (allowLocalUpdates) {
- Collection<Pair<Mutation, byte[]>> localUpdates =
- new ArrayList<Pair<Mutation, byte[]>>();
- current.addTimelineAnnotation("Actually doing local index update for first time");
- for (Pair<Mutation, byte[]> mutation : indexUpdates) {
- if (Bytes.toString(mutation.getSecond()).equals(
- environment.getRegion().getTableDesc().getNameAsString())) {
- localUpdates.add(mutation);
- }
- }
- if(!localUpdates.isEmpty()) {
- writer.writeAndKillYourselfOnFailure(localUpdates, allowLocalUpdates);
- }
- }
+ current.addTimelineAnnotation("Actually doing index update for first time");
+ Collection<Pair<Mutation, byte[]>> localUpdates =
+ new ArrayList<Pair<Mutation, byte[]>>();
+ Collection<Pair<Mutation, byte[]>> remoteUpdates =
+ new ArrayList<Pair<Mutation, byte[]>>();
+ for (Pair<Mutation, byte[]> mutation : indexUpdates) {
+ if (Bytes.toString(mutation.getSecond()).equals(
+ environment.getRegion().getTableDesc().getNameAsString())) {
+ localUpdates.add(mutation);
+ } else {
+ remoteUpdates.add(mutation);
+ }
+ }
+ if(!remoteUpdates.isEmpty()) {
+ writer.writeAndKillYourselfOnFailure(remoteUpdates, false);
+ }
+ if(!localUpdates.isEmpty()) {
+ writer.writeAndKillYourselfOnFailure(localUpdates, true);
+ }
} finally { // With a custom kill policy, we may throw instead of kill the server.
// Without doing this in a finally block (at least with the mini cluster),
// the region server never goes down.