You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/05/21 00:23:19 UTC

phoenix git commit: Start transaction correctly for immutable indexes

Repository: phoenix
Updated Branches:
  refs/heads/txn c7700b41d -> 46eb25c14


Start transaction correctly for immutable indexes


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/46eb25c1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/46eb25c1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/46eb25c1

Branch: refs/heads/txn
Commit: 46eb25c14bb868d5fb0e9c1e363b12677822edca
Parents: c7700b4
Author: James Taylor <jt...@salesforce.com>
Authored: Wed May 20 15:23:13 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed May 20 15:23:13 2015 -0700

----------------------------------------------------------------------
 .../end2end/index/TxImmutableIndexIT.java       | 87 ++++++++++++++++++++
 .../apache/phoenix/execute/MutationState.java   | 43 ++++++----
 .../apache/phoenix/index/IndexMaintainer.java   |  5 ++
 .../index/PhoenixTransactionalIndexer.java      | 35 +-------
 .../java/org/apache/phoenix/util/IndexUtil.java |  7 +-
 5 files changed, 130 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
new file mode 100644
index 0000000..4c77ea3
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class TxImmutableIndexIT extends ImmutableIndexIT {
+    
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan
+        // Forces server cache to be used
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        // We need this b/c we don't allow a transactional table to be created if the underlying
+        // HBase table already exists (since we don't know if it was transactional before).
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
+    @Test
+    public void testRollbackOfUncommittedIndexChange() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 VARCHAR, v3 VARCHAR) IMMUTABLE_ROWS=true");
+            stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)");
+            
+            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
+            
+            //assert values in data table
+            ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert values in data table
+            rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
+            assertFalse(rs.next());
+            
+        } finally {
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 4f1a2cd..1c3f130 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -94,7 +94,6 @@ public class MutationState implements SQLCloseable {
 
     private PhoenixConnection connection;
     private final long maxSize;
-    private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
     // map from table to rows 
     //   rows - map from rowkey to columns
     //      columns - map from column to value
@@ -264,6 +263,7 @@ public class MutationState implements SQLCloseable {
         final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
         Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator();
+        final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         while (iterator.hasNext()) {
             Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next();
             ImmutableBytesPtr key = rowEntry.getKey();
@@ -305,7 +305,7 @@ public class MutationState implements SQLCloseable {
                 try {
                     indexMutations =
                             IndexUtil.generateIndexData(tableRef.getTable(), index, mutationsPertainingToIndex,
-                                tempPtr, connection.getKeyValueBuilder(), connection);
+                                    ptr, connection.getKeyValueBuilder(), connection);
                 } catch (SQLException e) {
                     throw new IllegalDataException(e);
                 }
@@ -473,8 +473,9 @@ public class MutationState implements SQLCloseable {
                         divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes);
                         // Generate index deletes for immutable indexes that only reference row key
                         // columns and submit directly here.
+                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                         for (PTable index : rowKeyIndexes) {
-                            List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, tempPtr, connection.getKeyValueBuilder(), connection);
+                            List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, connection.getKeyValueBuilder(), connection);
                             HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes());
                             hindex.delete(indexDeletes);
                         }
@@ -511,6 +512,7 @@ public class MutationState implements SQLCloseable {
         // add tracing for this operation
         TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables");
         Span span = trace.getSpan();
+        ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
         while (tableRefIterator.hasNext()) {
             TableRef tableRef = tableRefIterator.next();
             Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = mutations.get(tableRef);
@@ -518,7 +520,7 @@ public class MutationState implements SQLCloseable {
                 continue;
             }
             PTable table = tableRef.getTable();
-            boolean hasIndexMaintainers = table.getIndexMaintainers(tempPtr, connection);
+            table.getIndexMaintainers(indexMetaDataPtr, connection);
             boolean isDataTable = true;
             // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
             long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
@@ -536,8 +538,8 @@ public class MutationState implements SQLCloseable {
                 boolean shouldRetry = false;
                 do {
                     ServerCache cache = null;
-                    if (hasIndexMaintainers && isDataTable) {
-                        cache = setMetaDataOnMutations(tableRef, mutations, tempPtr);
+                    if (isDataTable) {
+                        cache = setMetaDataOnMutations(tableRef, mutations, indexMetaDataPtr);
                     }
                 
                     // If we haven't retried yet, retry for this case only, as it's possible that
@@ -558,7 +560,11 @@ public class MutationState implements SQLCloseable {
                             // Don't add immutable indexes (those are the only ones that would participate
                             // during a commit), as we don't need conflict detection for these.
                             if (isDataTable) {
+                                // Even for immutable, we need to do this so that an abort has the state
+                                // necessary to generate the rows to delete.
                                 addTxParticipant(txnAware);
+                            } else {
+                                txnAware.startTx(getTransaction());
                             }
                             hTable = txnAware;
                         }
@@ -635,18 +641,23 @@ public class MutationState implements SQLCloseable {
         byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
         ServerCache cache = null;
         byte[] attribValue = null;
-        byte[] uuidValue;
+        byte[] uuidValue = null;
         byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
         if (table.isTransactional()) {
             txState = TransactionUtil.encodeTxnState(getTransaction());
         }
-        if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) {
-            IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
-            cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
-            uuidValue = cache.getId();
-        } else {
-            attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-            uuidValue = ServerCacheClient.generateId();
+        boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
+        if (hasIndexMetaData) {
+            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) {
+                IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+                cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
+                uuidValue = cache.getId();
+            } else {
+                attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+                uuidValue = ServerCacheClient.generateId();
+            }
+        } else if (txState.length == 0) {
+            return null;
         }
         // Either set the UUID to be able to access the index metadata from the cache
         // or set the index metadata directly on the Mutation
@@ -657,6 +668,10 @@ public class MutationState implements SQLCloseable {
             mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
             if (attribValue != null) {
                 mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                if (txState.length > 0) {
+                    mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                }
+            } else if (!hasIndexMetaData && txState.length > 0) {
                 mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index be668d6..aaaf685 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -200,6 +200,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr);
         int nIndexes = nMutableIndexes + keyValueIndexes.size();
         int estimatedSize = indexMetaDataPtr.getLength() + 1; // Just in case new size increases buffer
+        if (indexMetaDataPtr.getLength() == 0) {
+            estimatedSize += table.getRowKeySchema().getEstimatedByteSize();
+        }
         for (PTable index : keyValueIndexes) {
             estimatedSize += index.getIndexMaintainer(table, connection).getEstimatedByteSize();
         }
@@ -212,6 +215,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             // as its still included
             if (indexMetaDataPtr.getLength() > 0) {
                 output.write(indexMetaDataPtr.get(), indexMetaDataPtr.getOffset(), indexMetaDataPtr.getLength()-WritableUtils.getVIntSize(nMutableIndexes));
+            } else {
+                table.getRowKeySchema().write(output);
             }
             // Serialize mutable indexes afterwards
             for (PTable index : keyValueIndexes) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 862c4ba..cfe0058 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -30,8 +30,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
@@ -44,7 +42,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.hbase.index.MultiMutation;
@@ -69,7 +66,6 @@ import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
 import org.cloudera.htrace.TraceScope;
 
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -133,11 +129,12 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
 
         Mutation m = miniBatchOp.getOperation(0);
-        if (!codec.isEnabled(m) || m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+        if (!codec.isEnabled(m)) {
             super.preBatchMutate(c, miniBatchOp);
             return;
         }
 
+        boolean readOwnWrites = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null;
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
         PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
@@ -149,7 +146,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             }
 
             // get the index updates for all elements in this batch
-            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), false);
+            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), readOwnWrites);
 
             current.addTimelineAnnotation("Built index updates, doing preStep");
             TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
@@ -188,32 +185,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         return s;
     }
 
-    @Override
-    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete,
-        final WALEdit edit, final Durability durability) throws IOException {
-        
-        // Need to do this in preDelete as otherwise our scan won't see the old values unless
-        // we do a raw scan.
-        if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) == null || !codec.isEnabled(delete)) {
-            super.preDelete(e, delete, edit, durability);
-            return;
-        }
-        Map<String,byte[]> updateAttributes = delete.getAttributesMap();
-        PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(e.getEnvironment(),updateAttributes);
-        Collection<Pair<Mutation, byte[]>> indexUpdates = null;
-        try {
-            indexUpdates = getIndexUpdates(e.getEnvironment(), indexMetaData, Iterators.<Mutation>singletonIterator(delete), true);
-            // no index updates, so we are done
-            if (!indexUpdates.isEmpty()) {
-                this.writer.write(indexUpdates);
-            }
-        } catch (Throwable t) {
-            String msg = "Failed to rollback index updates: " + indexUpdates;
-            LOG.error(msg, t);
-            ServerUtil.throwIOException(msg, t);
-        }
-    }
-
     private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, boolean readOwnWrites) throws IOException {
         ResultScanner scanner = null;
         TransactionAwareHTable txTable = null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index fd111e0..51428ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -27,6 +27,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import co.cask.tephra.TxConstants;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
@@ -219,7 +221,10 @@ public class IndexUtil {
             for (final Mutation dataMutation : dataMutations) {
                 long ts = MetaDataUtil.getClientTimeStamp(dataMutation);
                 ptr.set(dataMutation.getRow());
-                indexMutations.add(maintainer.buildDeleteMutation(kvBuilder, ptr, ts));
+                Delete delete = maintainer.buildDeleteMutation(kvBuilder, ptr, ts);
+                // TODO: move to TransactionUtil
+                delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY));
+                indexMutations.add(delete);
             }
             return indexMutations;
         } catch (IOException e) {