You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/19 03:31:20 UTC
phoenix git commit: Add isValid check for Put index updates for
consistency
Repository: phoenix
Updated Branches:
refs/heads/txn 974329cd1 -> 433495482
Add isValid check for Put index updates for consistency
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/43349548
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/43349548
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/43349548
Branch: refs/heads/txn
Commit: 4334954829246d20b5a67ce6bf6b0abb1a7164af
Parents: 974329c
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Apr 18 18:31:13 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sat Apr 18 18:31:13 2015 -0700
----------------------------------------------------------------------
.../apache/phoenix/index/PhoenixIndexCodec.java | 84 --------------------
.../index/PhoenixTransactionalIndexer.java | 16 ++--
2 files changed, 10 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/43349548/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 956e5ea..109de84 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -19,7 +19,6 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
@@ -29,7 +28,6 @@ import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.TableState;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.util.MetaDataUtil;
import com.google.common.collect.Lists;
@@ -65,32 +63,16 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
return Collections.emptyList();
}
- // TODO: confirm that this special case isn't needed
- // (as state should match this with the above call, since there are no mutable columns)
- /*
- if (maintainer.isImmutableRows()) {
- indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns()));
- indexUpdate.setTable(maintainer.getIndexTableName());
- valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
- }
- */
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ptr.set(state.getCurrentRowKey());
List<IndexUpdate> indexUpdates = Lists.newArrayList();
for (IndexMaintainer maintainer : indexMaintainers) {
- if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes
- assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0);
- }
Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
ValueGetter valueGetter = statePair.getFirst();
IndexUpdate indexUpdate = statePair.getSecond();
indexUpdate.setTable(maintainer.getIndexTableName());
Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
.getRegion().getStartKey(), env.getRegion().getEndKey());
- if (put == null) {
- throw new IllegalStateException("Null put for " + env.getRegion().getRegionInfo().getTable().getNameAsString()
- + ": " + Bytes.toStringBinary(ptr.get(), ptr.getOffset(), ptr.getLength()));
- }
indexUpdate.setUpdate(put);
indexUpdates.add(indexUpdate);
}
@@ -104,9 +86,6 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
ptr.set(state.getCurrentRowKey());
List<IndexUpdate> indexUpdates = Lists.newArrayList();
for (IndexMaintainer maintainer : indexMaintainers) {
- if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes
- assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0);
- }
if (maintainer.isImmutableRows()) {
continue;
}
@@ -122,69 +101,6 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
return indexUpdates;
}
- /*
- @Override
- public Iterable<IndexUpdate> getIndexUpserts(TableState state, BatchContext context) throws IOException {
- return getIndexUpdates(state, context, true);
- }
-
- @Override
- public Iterable<IndexUpdate> getIndexDeletes(TableState state, BatchContext context) throws IOException {
- return getIndexUpdates(state, context, false);
- }
-
- private Iterable<IndexUpdate> getIndexUpdates(TableState state, BatchContext context, boolean upsert) throws IOException {
- List<IndexMaintainer> indexMaintainers = ((PhoenixBatchContext)context).getIndexMetaData().getIndexMaintainers();
- if (indexMaintainers.isEmpty()) { return Collections.emptyList(); }
- List<IndexUpdate> indexUpdates = Lists.newArrayList();
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- // TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy
- byte[] dataRowKey = state.getCurrentRowKey();
- ptr.set(dataRowKey);
- byte[] localIndexTableName = MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName());
- ValueGetter valueGetter = null;
- Scanner scanner = null;
- for (IndexMaintainer maintainer : indexMaintainers) {
- if (upsert) {
- // Short-circuit building state when we know it's a row deletion
- if (maintainer.isRowDeleted(state.getPendingUpdate())) {
- continue;
- }
- }
- IndexUpdate indexUpdate = null;
- if (maintainer.isImmutableRows()) {
- indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns()));
- if (maintainer.isLocalIndex()) {
- indexUpdate.setTable(localIndexTableName);
- } else {
- indexUpdate.setTable(maintainer.getIndexTableName());
- }
- valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
- } else {
- Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
- valueGetter = statePair.getFirst();
- indexUpdate = statePair.getSecond();
- indexUpdate.setTable(maintainer.getIndexTableName());
- }
- Mutation mutation = null;
- if (upsert) {
- mutation = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
- .getRegion().getStartKey(), env.getRegion().getEndKey());
- } else {
- mutation = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
- state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
- }
- indexUpdate.setUpdate(mutation);
- if (scanner != null) {
- scanner.close();
- scanner = null;
- }
- indexUpdates.add(indexUpdate);
- }
- return indexUpdates;
- }
- */
-
@Override
public boolean isEnabled(Mutation m) throws IOException {
return hasIndexMaintainers(m.getAttributesMap());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/43349548/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index d77f7e6..6a13552 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -195,18 +195,22 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
}
state.applyMutation();
- Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
- for (IndexUpdate update : updates) {
- indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+ Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
+ for (IndexUpdate put : puts) {
+ if (put.isValid()) {
+ indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
+ }
}
}
}
for (Mutation m : mutations.values()) {
TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m);
state.applyMutation();
- Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
- for (IndexUpdate update : updates) {
- indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+ Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
+ for (IndexUpdate put : puts) {
+ if (put.isValid()) {
+ indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
+ }
}
}
} finally {