You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2017/05/24 05:51:35 UTC
phoenix git commit: PHOENIX-3827 Make use of HBASE-15600 to write
local index mutations along with data mutations atomically(Rajeshbabu)
Repository: phoenix
Updated Branches:
refs/heads/master dfb2586af -> a2f4d7eeb
PHOENIX-3827 Make use of HBASE-15600 to write local index mutations along with data mutations atomically(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a2f4d7ee
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a2f4d7ee
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a2f4d7ee
Branch: refs/heads/master
Commit: a2f4d7eebec621b58204a9eb78d552f18dcbcf24
Parents: dfb2586
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Wed May 24 11:21:12 2017 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Wed May 24 11:21:12 2017 +0530
----------------------------------------------------------------------
.../end2end/IndexToolForPartialBuildIT.java | 41 +++--------
...olForPartialBuildWithNamespaceEnabledIT.java | 13 ++--
.../end2end/index/MutableIndexFailureIT.java | 6 +-
.../org/apache/phoenix/hbase/index/Indexer.java | 72 +++++++-------------
4 files changed, 45 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a2f4d7ee/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 59a9106..83bda64 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -64,9 +64,6 @@ import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -74,16 +71,12 @@ import com.google.common.collect.Maps;
/**
* Tests for the {@link IndexToolForPartialBuildIT}
*/
-@RunWith(Parameterized.class)
public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
- private final boolean localIndex;
protected boolean isNamespaceEnabled = false;
protected final String tableDDLOptions;
- public IndexToolForPartialBuildIT(boolean localIndex) {
-
- this.localIndex = localIndex;
+ public IndexToolForPartialBuildIT() {
StringBuilder optionBuilder = new StringBuilder();
optionBuilder.append(" SPLIT ON(1,2)");
this.tableDDLOptions = optionBuilder.toString();
@@ -108,13 +101,6 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
}
- @Parameters(name="localIndex = {0}")
- public static Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] {
- { false},{ true }
- });
- }
-
@Test
public void testSecondaryIndex() throws Exception {
String schemaName = generateUniqueName();
@@ -142,8 +128,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
upsertRow(stmt1, 2000);
conn.commit();
- stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ",
- (localIndex ? "LOCAL" : ""), indxTable, fullTableName));
+ stmt.execute(String.format("CREATE INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ", indxTable, fullTableName));
FailingRegionObserver.FAIL_WRITE = true;
upsertRow(stmt1, 3000);
upsertRow(stmt1, 4000);
@@ -186,7 +171,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
// assert we are pulling from data table.
- assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, false, isNamespaceEnabled);
+ assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, isNamespaceEnabled);
rs = stmt1.executeQuery(selectSql);
for (int i = 1; i <= 7; i++) {
@@ -219,7 +204,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
// assert we are pulling from index table.
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
actualExplainPlan = QueryUtil.getExplainPlan(rs);
- assertExplainPlan(actualExplainPlan, schemaName, dataTableName, indxTable, localIndex, isNamespaceEnabled);
+ assertExplainPlan(actualExplainPlan, schemaName, dataTableName, indxTable, isNamespaceEnabled);
rs = stmt.executeQuery(selectSql);
@@ -237,21 +222,13 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
}
public static void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
- String indxTable, boolean isLocal, boolean isNamespaceMapped) {
+ String indxTable, boolean isNamespaceMapped) {
String expectedExplainPlan = "";
if (indxTable != null) {
- if (isLocal) {
- final String localIndexName = SchemaUtil
- .getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable), isNamespaceMapped,
- PTableType.INDEX)
- .getString();
- expectedExplainPlan = String.format("CLIENT PARALLEL 3-WAY RANGE SCAN OVER %s [1]", localIndexName);
- } else {
- expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
- SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, indxTable),
- isNamespaceMapped, PTableType.INDEX));
- }
+ expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
+ SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, indxTable),
+ isNamespaceMapped, PTableType.INDEX));
} else {
expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable),
@@ -270,7 +247,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
args.add(dataTable);
args.add("-pr");
args.add("-op");
- args.add("/tmp/output/partialTable_"+localIndex);
+ args.add("/tmp/output/partialTable_");
return args.toArray(new String[0]);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a2f4d7ee/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
index a8c1f1e..02c2d93 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
@@ -24,6 +24,8 @@ import java.util.Map;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Maps;
@@ -31,11 +33,12 @@ import com.google.common.collect.Maps;
/**
* Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled}
*/
+@RunWith(Parameterized.class)
public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolForPartialBuildIT {
-
- public IndexToolForPartialBuildWithNamespaceEnabledIT(boolean localIndex, boolean isNamespaceEnabled) {
- super(localIndex);
+
+ public IndexToolForPartialBuildWithNamespaceEnabledIT(boolean isNamespaceEnabled) {
+ super();
this.isNamespaceEnabled=isNamespaceEnabled;
}
@@ -49,10 +52,10 @@ public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolFor
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
- @Parameters(name="localIndex = {0} , isNamespaceEnabled = {1}")
+ @Parameters(name="isNamespaceEnabled = {0}")
public static Collection<Boolean[]> data() {
return Arrays.asList(new Boolean[][] {
- { false, true},{ true, false }
+ { true },{ false }
});
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a2f4d7ee/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 853647e..07f587d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -234,7 +234,7 @@ public class MutableIndexFailureIT extends BaseTest {
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
// the index is only disabled for non-txn tables upon index table write failure
- if (transactional || leaveIndexActiveOnFailure) {
+ if (transactional || leaveIndexActiveOnFailure || localIndex) {
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
} else {
String indexState = rs.getString("INDEX_STATE");
@@ -413,7 +413,7 @@ public class MutableIndexFailureIT extends BaseTest {
stmt.execute();
try {
conn.commit();
- if (commitShouldFail) {
+ if (commitShouldFail && !localIndex) {
fail();
}
} catch (CommitException e) {
@@ -434,7 +434,7 @@ public class MutableIndexFailureIT extends BaseTest {
stmt.execute();
try {
conn.commit();
- if (commitShouldFail) {
+ if (commitShouldFail && !localIndex) {
fail();
}
} catch (CommitException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a2f4d7ee/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 15e53a3..04deddb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -23,8 +23,8 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -96,6 +96,11 @@ import com.google.common.collect.Multimap;
* nothing does. Currently, we do not support mixed-durability updates within a single batch. If you
* want to have different durability levels, you only need to split the updates into two different
* batches.
+ * <p>
+ * We don't need to implement {@link #postPut(ObserverContext, Put, WALEdit, Durability)} and
+ * {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} hooks because
+ * Phoenix always does batch mutations.
+ * <p>
*/
public class Indexer extends BaseRegionObserver {
@@ -225,10 +230,8 @@ public class Indexer extends BaseRegionObserver {
if (!mutations.isEmpty()) {
Region region = e.getEnvironment().getRegion();
// Otherwise, submit the mutations directly here
- region.mutateRowsWithLocks(
- mutations,
- Collections.<byte[]>emptyList(), // Rows are already locked
- HConstants.NO_NONCE, HConstants.NO_NONCE);
+ region.batchMutate(mutations.toArray(new Mutation[0]), HConstants.NO_NONCE,
+ HConstants.NO_NONCE);
}
return Result.EMPTY_RESULT;
} catch (Throwable t) {
@@ -320,14 +323,26 @@ public class Indexer extends BaseRegionObserver {
if (current == null) {
current = NullSpan.INSTANCE;
}
-
// get the index updates for all elements in this batch
Collection<Pair<Mutation, byte[]>> indexUpdates =
this.builder.getIndexUpdate(miniBatchOp, mutations.values());
-
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
-
+ byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+ Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
+ List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
+ while(indexUpdatesItr.hasNext()) {
+ Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+ if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
+ localUpdates.add(next.getFirst());
+ indexUpdatesItr.remove();
+ }
+ }
+ if (!localUpdates.isEmpty()) {
+ miniBatchOp.addOperationsFromCP(0,
+ localUpdates.toArray(new Mutation[localUpdates.size()]));
+ }
+
// write them, either to WAL or the index tables
doPre(indexUpdates, edit, durability);
}
@@ -366,26 +381,6 @@ public class Indexer extends BaseRegionObserver {
}
@Override
- public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
- final Durability durability) throws IOException {
- if (this.disabled) {
- super.postPut(e, put, edit, durability);
- return;
- }
- doPost(edit, put, durability);
- }
-
- @Override
- public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
- WALEdit edit, final Durability durability) throws IOException {
- if (this.disabled) {
- super.postDelete(e, delete, edit, durability);
- return;
- }
- doPost(edit, delete, durability);
- }
-
- @Override
public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
if (this.disabled) {
@@ -454,25 +449,8 @@ public class Indexer extends BaseRegionObserver {
// references originally - therefore, we just pass in a null factory here and use the ones
// already specified on each reference
try {
- current.addTimelineAnnotation("Actually doing index update for first time");
- Collection<Pair<Mutation, byte[]>> localUpdates =
- new ArrayList<Pair<Mutation, byte[]>>();
- Collection<Pair<Mutation, byte[]>> remoteUpdates =
- new ArrayList<Pair<Mutation, byte[]>>();
- for (Pair<Mutation, byte[]> mutation : indexUpdates) {
- if (Bytes.toString(mutation.getSecond()).equals(
- environment.getRegion().getTableDesc().getNameAsString())) {
- localUpdates.add(mutation);
- } else {
- remoteUpdates.add(mutation);
- }
- }
- if(!remoteUpdates.isEmpty()) {
- writer.writeAndKillYourselfOnFailure(remoteUpdates, false);
- }
- if(!localUpdates.isEmpty()) {
- writer.writeAndKillYourselfOnFailure(localUpdates, true);
- }
+ current.addTimelineAnnotation("Actually doing index update for first time");
+ writer.writeAndKillYourselfOnFailure(indexUpdates, false);
} finally { // With a custom kill policy, we may throw instead of kill the server.
// Without doing this in a finally block (at least with the mini cluster),
// the region server never goes down.