You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/05/21 00:23:19 UTC
phoenix git commit: Start transaction correctly for immutable indexes
Repository: phoenix
Updated Branches:
refs/heads/txn c7700b41d -> 46eb25c14
Start transaction correctly for immutable indexes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/46eb25c1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/46eb25c1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/46eb25c1
Branch: refs/heads/txn
Commit: 46eb25c14bb868d5fb0e9c1e363b12677822edca
Parents: c7700b4
Author: James Taylor <jt...@salesforce.com>
Authored: Wed May 20 15:23:13 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed May 20 15:23:13 2015 -0700
----------------------------------------------------------------------
.../end2end/index/TxImmutableIndexIT.java | 87 ++++++++++++++++++++
.../apache/phoenix/execute/MutationState.java | 43 ++++++----
.../apache/phoenix/index/IndexMaintainer.java | 5 ++
.../index/PhoenixTransactionalIndexer.java | 35 +-------
.../java/org/apache/phoenix/util/IndexUtil.java | 7 +-
5 files changed, 130 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
new file mode 100644
index 0000000..4c77ea3
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class TxImmutableIndexIT extends ImmutableIndexIT {
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan
+ // Forces server cache to be used
+ props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+ // We need this b/c we don't allow a transactional table to be created if the underlying
+ // HBase table already exists (since we don't know if it was transactional before).
+ props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testRollbackOfUncommittedIndexChange() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(false);
+ try {
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 VARCHAR, v3 VARCHAR) IMMUTABLE_ROWS=true");
+ stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)");
+
+ stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
+
+ //assert values in data table
+ ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
+ assertTrue(rs.next());
+ assertEquals("x", rs.getString(1));
+ assertEquals("y", rs.getString(2));
+ assertEquals("a", rs.getString(3));
+ assertFalse(rs.next());
+
+ conn.rollback();
+
+ //assert values in data table
+ rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
+ assertFalse(rs.next());
+
+ } finally {
+ conn.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 4f1a2cd..1c3f130 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -94,7 +94,6 @@ public class MutationState implements SQLCloseable {
private PhoenixConnection connection;
private final long maxSize;
- private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
// map from table to rows
// rows - map from rowkey to columns
// columns - map from column to value
@@ -264,6 +263,7 @@ public class MutationState implements SQLCloseable {
final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator();
+ final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
while (iterator.hasNext()) {
Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next();
ImmutableBytesPtr key = rowEntry.getKey();
@@ -305,7 +305,7 @@ public class MutationState implements SQLCloseable {
try {
indexMutations =
IndexUtil.generateIndexData(tableRef.getTable(), index, mutationsPertainingToIndex,
- tempPtr, connection.getKeyValueBuilder(), connection);
+ ptr, connection.getKeyValueBuilder(), connection);
} catch (SQLException e) {
throw new IllegalDataException(e);
}
@@ -473,8 +473,9 @@ public class MutationState implements SQLCloseable {
divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes);
// Generate index deletes for immutable indexes that only reference row key
// columns and submit directly here.
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
for (PTable index : rowKeyIndexes) {
- List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, tempPtr, connection.getKeyValueBuilder(), connection);
+ List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, connection.getKeyValueBuilder(), connection);
HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes());
hindex.delete(indexDeletes);
}
@@ -511,6 +512,7 @@ public class MutationState implements SQLCloseable {
// add tracing for this operation
TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables");
Span span = trace.getSpan();
+ ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
while (tableRefIterator.hasNext()) {
TableRef tableRef = tableRefIterator.next();
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = mutations.get(tableRef);
@@ -518,7 +520,7 @@ public class MutationState implements SQLCloseable {
continue;
}
PTable table = tableRef.getTable();
- boolean hasIndexMaintainers = table.getIndexMaintainers(tempPtr, connection);
+ table.getIndexMaintainers(indexMetaDataPtr, connection);
boolean isDataTable = true;
// Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
@@ -536,8 +538,8 @@ public class MutationState implements SQLCloseable {
boolean shouldRetry = false;
do {
ServerCache cache = null;
- if (hasIndexMaintainers && isDataTable) {
- cache = setMetaDataOnMutations(tableRef, mutations, tempPtr);
+ if (isDataTable) {
+ cache = setMetaDataOnMutations(tableRef, mutations, indexMetaDataPtr);
}
// If we haven't retried yet, retry for this case only, as it's possible that
@@ -558,7 +560,11 @@ public class MutationState implements SQLCloseable {
// Don't add immutable indexes (those are the only ones that would participate
// during a commit), as we don't need conflict detection for these.
if (isDataTable) {
+ // Even for immutable, we need to do this so that an abort has the state
+ // necessary to generate the rows to delete.
addTxParticipant(txnAware);
+ } else {
+ txnAware.startTx(getTransaction());
}
hTable = txnAware;
}
@@ -635,18 +641,23 @@ public class MutationState implements SQLCloseable {
byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
ServerCache cache = null;
byte[] attribValue = null;
- byte[] uuidValue;
+ byte[] uuidValue = null;
byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
if (table.isTransactional()) {
txState = TransactionUtil.encodeTxnState(getTransaction());
}
- if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) {
- IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
- uuidValue = cache.getId();
- } else {
- attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
- uuidValue = ServerCacheClient.generateId();
+ boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
+ if (hasIndexMetaData) {
+ if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) {
+ IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+ cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
+ uuidValue = cache.getId();
+ } else {
+ attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+ uuidValue = ServerCacheClient.generateId();
+ }
+ } else if (txState.length == 0) {
+ return null;
}
// Either set the UUID to be able to access the index metadata from the cache
// or set the index metadata directly on the Mutation
@@ -657,6 +668,10 @@ public class MutationState implements SQLCloseable {
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
if (attribValue != null) {
mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ if (txState.length > 0) {
+ mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
+ } else if (!hasIndexMetaData && txState.length > 0) {
mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index be668d6..aaaf685 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -200,6 +200,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr);
int nIndexes = nMutableIndexes + keyValueIndexes.size();
int estimatedSize = indexMetaDataPtr.getLength() + 1; // Just in case new size increases buffer
+ if (indexMetaDataPtr.getLength() == 0) {
+ estimatedSize += table.getRowKeySchema().getEstimatedByteSize();
+ }
for (PTable index : keyValueIndexes) {
estimatedSize += index.getIndexMaintainer(table, connection).getEstimatedByteSize();
}
@@ -212,6 +215,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
// as its still included
if (indexMetaDataPtr.getLength() > 0) {
output.write(indexMetaDataPtr.get(), indexMetaDataPtr.getOffset(), indexMetaDataPtr.getLength()-WritableUtils.getVIntSize(nMutableIndexes));
+ } else {
+ table.getRowKeySchema().write(output);
}
// Serialize mutable indexes afterwards
for (PTable index : keyValueIndexes) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 862c4ba..cfe0058 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -30,8 +30,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
@@ -44,7 +42,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.hbase.index.MultiMutation;
@@ -69,7 +66,6 @@ import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -133,11 +129,12 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
Mutation m = miniBatchOp.getOperation(0);
- if (!codec.isEnabled(m) || m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+ if (!codec.isEnabled(m)) {
super.preBatchMutate(c, miniBatchOp);
return;
}
+ boolean readOwnWrites = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null;
Map<String,byte[]> updateAttributes = m.getAttributesMap();
PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
Collection<Pair<Mutation, byte[]>> indexUpdates = null;
@@ -149,7 +146,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
// get the index updates for all elements in this batch
- indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), false);
+ indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), readOwnWrites);
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
@@ -188,32 +185,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
return s;
}
- @Override
- public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete,
- final WALEdit edit, final Durability durability) throws IOException {
-
- // Need to do this in preDelete as otherwise our scan won't see the old values unless
- // we do a raw scan.
- if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) == null || !codec.isEnabled(delete)) {
- super.preDelete(e, delete, edit, durability);
- return;
- }
- Map<String,byte[]> updateAttributes = delete.getAttributesMap();
- PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(e.getEnvironment(),updateAttributes);
- Collection<Pair<Mutation, byte[]>> indexUpdates = null;
- try {
- indexUpdates = getIndexUpdates(e.getEnvironment(), indexMetaData, Iterators.<Mutation>singletonIterator(delete), true);
- // no index updates, so we are done
- if (!indexUpdates.isEmpty()) {
- this.writer.write(indexUpdates);
- }
- } catch (Throwable t) {
- String msg = "Failed to rollback index updates: " + indexUpdates;
- LOG.error(msg, t);
- ServerUtil.throwIOException(msg, t);
- }
- }
-
private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, boolean readOwnWrites) throws IOException {
ResultScanner scanner = null;
TransactionAwareHTable txTable = null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index fd111e0..51428ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -27,6 +27,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import co.cask.tephra.TxConstants;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
@@ -219,7 +221,10 @@ public class IndexUtil {
for (final Mutation dataMutation : dataMutations) {
long ts = MetaDataUtil.getClientTimeStamp(dataMutation);
ptr.set(dataMutation.getRow());
- indexMutations.add(maintainer.buildDeleteMutation(kvBuilder, ptr, ts));
+ Delete delete = maintainer.buildDeleteMutation(kvBuilder, ptr, ts);
+ // TODO: move to TransactionUtil
+ delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY));
+ indexMutations.add(delete);
}
return indexMutations;
} catch (IOException e) {