You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/19 03:31:20 UTC

phoenix git commit: Add isValid check for Put index updates for consistency

Repository: phoenix
Updated Branches:
  refs/heads/txn 974329cd1 -> 433495482


Add isValid check for Put index updates for consistency


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/43349548
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/43349548
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/43349548

Branch: refs/heads/txn
Commit: 4334954829246d20b5a67ce6bf6b0abb1a7164af
Parents: 974329c
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Apr 18 18:31:13 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sat Apr 18 18:31:13 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/index/PhoenixIndexCodec.java | 84 --------------------
 .../index/PhoenixTransactionalIndexer.java      | 16 ++--
 2 files changed, 10 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/43349548/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 956e5ea..109de84 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -19,7 +19,6 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
@@ -29,7 +28,6 @@ import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.util.MetaDataUtil;
 
 import com.google.common.collect.Lists;
 
@@ -65,32 +63,16 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
             return Collections.emptyList();
         }
-        // TODO: confirm that this special case isn't needed
-        // (as state should match this with the above call, since there are no mutable columns)
-        /*
-        if (maintainer.isImmutableRows()) {
-            indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns()));
-            indexUpdate.setTable(maintainer.getIndexTableName());
-            valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
-        }
-        */
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         ptr.set(state.getCurrentRowKey());
         List<IndexUpdate> indexUpdates = Lists.newArrayList();
         for (IndexMaintainer maintainer : indexMaintainers) {
-            if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes
-                assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0);
-            }
             Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
             ValueGetter valueGetter = statePair.getFirst();
             IndexUpdate indexUpdate = statePair.getSecond();
             indexUpdate.setTable(maintainer.getIndexTableName());
             Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
                     .getRegion().getStartKey(), env.getRegion().getEndKey());
-            if (put == null) {
-                throw new IllegalStateException("Null put for " + env.getRegion().getRegionInfo().getTable().getNameAsString() 
-                        + ": " + Bytes.toStringBinary(ptr.get(), ptr.getOffset(), ptr.getLength()));
-            }
             indexUpdate.setUpdate(put);
             indexUpdates.add(indexUpdate);
         }
@@ -104,9 +86,6 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         ptr.set(state.getCurrentRowKey());
         List<IndexUpdate> indexUpdates = Lists.newArrayList();
         for (IndexMaintainer maintainer : indexMaintainers) {
-            if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes
-                assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0);
-            }
             if (maintainer.isImmutableRows()) {
                 continue;
             }
@@ -122,69 +101,6 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         return indexUpdates;
     }
 
-    /*
-    @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state, BatchContext context) throws IOException {
-        return getIndexUpdates(state, context, true);
-    }
-
-    @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state, BatchContext context) throws IOException {
-        return getIndexUpdates(state, context, false);
-    }
-
-    private Iterable<IndexUpdate> getIndexUpdates(TableState state, BatchContext context, boolean upsert) throws IOException {
-        List<IndexMaintainer> indexMaintainers = ((PhoenixBatchContext)context).getIndexMetaData().getIndexMaintainers();
-        if (indexMaintainers.isEmpty()) { return Collections.emptyList(); }
-        List<IndexUpdate> indexUpdates = Lists.newArrayList();
-        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        // TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy
-        byte[] dataRowKey = state.getCurrentRowKey();
-        ptr.set(dataRowKey);
-        byte[] localIndexTableName = MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName());
-        ValueGetter valueGetter = null;
-        Scanner scanner = null;
-        for (IndexMaintainer maintainer : indexMaintainers) {
-            if (upsert) {
-                // Short-circuit building state when we know it's a row deletion
-                if (maintainer.isRowDeleted(state.getPendingUpdate())) {
-                    continue;
-                }
-            }
-            IndexUpdate indexUpdate = null;
-            if (maintainer.isImmutableRows()) {
-                indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns()));
-                if (maintainer.isLocalIndex()) {
-                    indexUpdate.setTable(localIndexTableName);
-                } else {
-                    indexUpdate.setTable(maintainer.getIndexTableName());
-                }
-                valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
-            } else {
-                Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
-                valueGetter = statePair.getFirst();
-                indexUpdate = statePair.getSecond();
-                indexUpdate.setTable(maintainer.getIndexTableName());
-            }
-            Mutation mutation = null;
-            if (upsert) {
-                mutation = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
-                        .getRegion().getStartKey(), env.getRegion().getEndKey());
-            } else {
-                mutation = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
-                        state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
-            }
-            indexUpdate.setUpdate(mutation);
-            if (scanner != null) {
-                scanner.close();
-                scanner = null;
-            }
-            indexUpdates.add(indexUpdate);
-        }
-        return indexUpdates;
-    }
-    */
-
     @Override
     public boolean isEnabled(Mutation m) throws IOException {
         return hasIndexMaintainers(m.getAttributesMap());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/43349548/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index d77f7e6..6a13552 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -195,18 +195,22 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                     		indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
                     }
                     state.applyMutation();
-                    Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
-                    for (IndexUpdate update : updates) {
-                        indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+                    Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
+                    for (IndexUpdate put : puts) {
+                        if (put.isValid()) {
+                            indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
+                        }
                     }
                 }
             }
             for (Mutation m : mutations.values()) {
                 TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m);
                 state.applyMutation();
-                Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
-                for (IndexUpdate update : updates) {
-                    indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+                Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
+                for (IndexUpdate put : puts) {
+                    if (put.isValid()) {
+                        indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
+                    }
                 }
             }
         } finally {