You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/11/29 02:38:28 UTC
phoenix git commit: Revert "PHOENIX-4386 Calculate the estimatedSize
of MutationState using Map> mutations (addendum)"
Repository: phoenix
Updated Branches:
refs/heads/master d46d4e564 -> 355ee522c
Revert "PHOENIX-4386 Calculate the estimatedSize of MutationState using Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations (addendum)"
This reverts commit 4e0c0a33ed8b401f7785dde8979041dd5ab9a1f4.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/355ee522
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/355ee522
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/355ee522
Branch: refs/heads/master
Commit: 355ee522c1d4ff07cf9fbb0a9a01e43e3f702730
Parents: d46d4e5
Author: Thomas D'Silva <td...@apache.org>
Authored: Tue Nov 28 18:37:55 2017 -0800
Committer: Thomas D'Silva <td...@apache.org>
Committed: Tue Nov 28 18:37:55 2017 -0800
----------------------------------------------------------------------
.../apache/phoenix/execute/PartialCommitIT.java | 5 +-
.../apache/phoenix/compile/DeleteCompiler.java | 11 +-
.../apache/phoenix/compile/UpsertCompiler.java | 7 +-
.../apache/phoenix/execute/MutationState.java | 127 +++++++------------
.../java/org/apache/phoenix/util/IndexUtil.java | 4 +-
.../org/apache/phoenix/util/KeyValueUtil.java | 5 +-
6 files changed, 61 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index e5b57e3..10fd7f8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -33,6 +33,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -51,8 +52,8 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseOwnClusterIT;
-import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.monitoring.GlobalMetric;
import org.apache.phoenix.monitoring.MetricType;
@@ -284,7 +285,7 @@ public class PartialCommitIT extends BaseOwnClusterIT {
private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException {
Connection con = driver.connect(url, new Properties());
PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
- final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator());
+ final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
// passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState
return new PhoenixConnection(phxCon, null) {
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index a06e2ca..f9ca300 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
@@ -42,7 +43,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.ValueGetter;
@@ -91,6 +91,7 @@ import org.apache.phoenix.util.ScanUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.sun.istack.NotNull;
public class DeleteCompiler {
@@ -120,14 +121,14 @@ public class DeleteCompiler {
final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
- MultiRowMutationState mutations = new MultiRowMutationState(batchSize);
- List<MultiRowMutationState> indexMutations = null;
+ Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
+ List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null;
// If indexTableRef is set, we're deleting the rows from both the index table and
// the data table through a single query to save executing an additional one.
if (!otherTableRefs.isEmpty()) {
indexMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size());
for (int i = 0; i < otherTableRefs.size(); i++) {
- indexMutations.add(new MultiRowMutationState(batchSize));
+ indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize));
}
}
List<PColumn> pkColumns = table.getPKColumns();
@@ -643,7 +644,7 @@ public class DeleteCompiler {
// keys for our ranges
ScanRanges ranges = context.getScanRanges();
Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
- MultiRowMutationState mutation = new MultiRowMutationState(ranges.getPointLookupCount());
+ Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
while (iterator.hasNext()) {
mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()),
new RowMutationState(PRow.DELETE_MARKER, 0,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index a81a427..a51fd4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -47,7 +47,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
import org.apache.phoenix.expression.Determinism;
@@ -117,7 +116,7 @@ import com.google.common.collect.Sets;
public class UpsertCompiler {
private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
- PTable table, MultiRowMutationState mutation,
+ PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException {
long columnValueSize = 0;
@@ -198,7 +197,7 @@ public class UpsertCompiler {
}
}
int rowCount = 0;
- MultiRowMutationState mutation = new MultiRowMutationState(batchSize);
+ Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
PTable table = tableRef.getTable();
IndexMaintainer indexMaintainer = null;
byte[][] viewConstants = null;
@@ -1178,7 +1177,7 @@ public class UpsertCompiler {
throw new IllegalStateException();
}
}
- MultiRowMutationState mutation = new MultiRowMutationState(1);
+ Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
IndexMaintainer indexMaintainer = null;
byte[][] viewConstants = null;
if (table.getIndexType() == IndexType.LOCAL) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 7462baa..b5a55b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -123,7 +123,7 @@ public class MutationState implements SQLCloseable {
private final long batchSize;
private final long batchSizeBytes;
private long batchCount = 0L;
- private final Map<TableRef, MultiRowMutationState> mutations;
+ private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10);
private long sizeOffset;
@@ -131,7 +131,7 @@ public class MutationState implements SQLCloseable {
private long estimatedSize = 0;
private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
private boolean isExternalTxContext = false;
- private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
+ private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
final PhoenixTransactionContext phoenixTransactionContext;
@@ -159,12 +159,12 @@ public class MutationState implements SQLCloseable {
}
private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) {
- this(maxSize, maxSizeBytes, connection, Maps.<TableRef, MultiRowMutationState>newHashMapWithExpectedSize(5), subTask, txContext);
+ this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, txContext);
this.sizeOffset = sizeOffset;
}
MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection,
- Map<TableRef, MultiRowMutationState> mutations,
+ Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
boolean subTask, PhoenixTransactionContext txContext) {
this.maxSize = maxSize;
this.maxSizeBytes = maxSizeBytes;
@@ -189,7 +189,7 @@ public class MutationState implements SQLCloseable {
}
}
- public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
+ public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
if (!mutations.isEmpty()) {
this.mutations.put(table, mutations);
@@ -350,7 +350,7 @@ public class MutationState implements SQLCloseable {
}
public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) {
- MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, MultiRowMutationState>emptyMap(), false, null);
+ MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null);
state.sizeOffset = 0;
return state;
}
@@ -372,12 +372,12 @@ public class MutationState implements SQLCloseable {
return sizeOffset + numRows;
}
- private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows,
- Map<TableRef, MultiRowMutationState> dstMutations) {
+ private void joinMutationState(TableRef tableRef, Map<ImmutableBytesPtr,RowMutationState> srcRows,
+ Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
PTable table = tableRef.getTable();
boolean isIndex = table.getType() == PTableType.INDEX;
boolean incrementRowCount = dstMutations == this.mutations;
- MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows);
+ Map<ImmutableBytesPtr,RowMutationState> existingRows = dstMutations.put(tableRef, srcRows);
if (existingRows != null) { // Rows for that table already exist
// Loop through new rows and replace existing with new
for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) {
@@ -389,12 +389,8 @@ public class MutationState implements SQLCloseable {
Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
// if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row.
if (newRow != PRow.DELETE_MARKER) {
- // decrement estimated size by the size of the old row
- estimatedSize-=existingRowMutationState.calculateEstimatedSize();
// Merge existing column values with new column values
existingRowMutationState.join(rowEntry.getValue());
- // increment estimated size by the size of the new row
- estimatedSize+=existingRowMutationState.calculateEstimatedSize();
// Now that the existing row has been merged with the new row, replace it back
// again (since it was merged with the new one above).
existingRows.put(rowEntry.getKey(), existingRowMutationState);
@@ -403,8 +399,6 @@ public class MutationState implements SQLCloseable {
} else {
if (incrementRowCount && !isIndex) { // Don't count index rows in row count
numRows++;
- // increment estimated size by the size of the new row
- estimatedSize += rowEntry.getValue().calculateEstimatedSize();
}
}
}
@@ -412,25 +406,22 @@ public class MutationState implements SQLCloseable {
dstMutations.put(tableRef, existingRows);
} else {
// Size new map at batch size as that's what it'll likely grow to.
- MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize());
+ Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
newRows.putAll(srcRows);
dstMutations.put(tableRef, newRows);
if (incrementRowCount && !isIndex) {
numRows += srcRows.size();
- // if we added all the rows from newMutationState we can just increment the
- // estimatedSize by newMutationState.estimatedSize
- estimatedSize += srcRows.estimatedSize;
}
}
}
- private void joinMutationState(Map<TableRef, MultiRowMutationState> srcMutations,
- Map<TableRef, MultiRowMutationState> dstMutations) {
+ private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> srcMutations,
+ Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
// Merge newMutation with this one, keeping state from newMutation for any overlaps
- for (Map.Entry<TableRef, MultiRowMutationState> entry : srcMutations.entrySet()) {
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : srcMutations.entrySet()) {
// Replace existing entries for the table with new entries
TableRef tableRef = entry.getKey();
- MultiRowMutationState srcRows = entry.getValue();
+ Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue();
joinMutationState(tableRef, srcRows, dstMutations);
}
}
@@ -448,7 +439,19 @@ public class MutationState implements SQLCloseable {
phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
this.sizeOffset += newMutationState.sizeOffset;
+ int oldNumRows = this.numRows;
joinMutationState(newMutationState.mutations, this.mutations);
+ if (newMutationState.numRows>0) {
+ // if we added all the rows from newMutationState we can just increment the
+ // estimatedSize by newMutationState.estimatedSize
+ if (newMutationState.numRows == this.numRows-oldNumRows) {
+ this.estimatedSize += newMutationState.estimatedSize;
+ }
+ // we merged the two mutation states so we need to recalculate the size
+ else {
+ this.estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(this.mutations);
+ }
+ }
if (!newMutationState.txMutations.isEmpty()) {
if (txMutations.isEmpty()) {
txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
@@ -486,7 +489,7 @@ public class MutationState implements SQLCloseable {
return ptr;
}
- private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values,
+ private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values,
final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) {
final PTable table = tableRef.getTable();
final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
@@ -521,10 +524,10 @@ public class MutationState implements SQLCloseable {
// we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
if (!sendAll) {
TableRef key = new TableRef(index);
- MultiRowMutationState multiRowMutationState = mutations.remove(key);
- if (multiRowMutationState!=null) {
+ Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key);
+ if (rowToColumnMap!=null) {
final List<Mutation> deleteMutations = Lists.newArrayList();
- generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null);
+ generateMutations(tableRef, mutationTimestamp, serverTimestamp, rowToColumnMap, deleteMutations, null);
indexMutations.addAll(deleteMutations);
}
}
@@ -543,14 +546,14 @@ public class MutationState implements SQLCloseable {
}
private void generateMutations(final TableRef tableRef, final long mutationTimestamp,
- final long serverTimestamp, final MultiRowMutationState values,
+ final long serverTimestamp, final Map<ImmutableBytesPtr, RowMutationState> values,
final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) {
final PTable table = tableRef.getTable();
boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1;
Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator =
values.entrySet().iterator();
long timestampToUse = mutationTimestamp;
- MultiRowMutationState modifiedValues = new MultiRowMutationState(16);
+ Map<ImmutableBytesPtr, RowMutationState> modifiedValues = Maps.newHashMap();
while (iterator.hasNext()) {
Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next();
byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes();
@@ -625,7 +628,7 @@ public class MutationState implements SQLCloseable {
}
public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) {
- final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator = this.mutations.entrySet().iterator();
+ final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
if (!iterator.hasNext()) {
return Collections.emptyIterator();
}
@@ -633,7 +636,7 @@ public class MutationState implements SQLCloseable {
final long serverTimestamp = getTableTimestamp(tableTimestamp, scn);
final long mutationTimestamp = getMutationTimestamp(scn);
return new Iterator<Pair<byte[],List<Mutation>>>() {
- private Map.Entry<TableRef, MultiRowMutationState> current = iterator.next();
+ private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
private Iterator<Pair<byte[],List<Mutation>>> init() {
@@ -697,14 +700,14 @@ public class MutationState implements SQLCloseable {
private long[] validateAll() throws SQLException {
int i = 0;
long[] timeStamps = new long[this.mutations.size()];
- for (Map.Entry<TableRef, MultiRowMutationState> entry : mutations.entrySet()) {
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
TableRef tableRef = entry.getKey();
timeStamps[i++] = validateAndGetServerTimestamp(tableRef, entry.getValue());
}
return timeStamps;
}
- private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) throws SQLException {
+ private long validateAndGetServerTimestamp(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException {
Long scn = connection.getSCN();
MetaDataClient client = new MetaDataClient(connection);
long serverTimeStamp = tableRef.getTimeStamp();
@@ -916,7 +919,7 @@ public class MutationState implements SQLCloseable {
sendAll = true;
}
- MultiRowMutationState multiRowMutationState;
+ Map<ImmutableBytesPtr, RowMutationState> valuesMap;
Map<TableInfo,List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap();
// add tracing for this operation
try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
@@ -925,16 +928,16 @@ public class MutationState implements SQLCloseable {
while (tableRefIterator.hasNext()) {
// at this point we are going through mutations for each table
final TableRef tableRef = tableRefIterator.next();
- multiRowMutationState = mutations.get(tableRef);
- if (multiRowMutationState == null || multiRowMutationState.isEmpty()) {
+ valuesMap = mutations.get(tableRef);
+ if (valuesMap == null || valuesMap.isEmpty()) {
continue;
}
// Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
- long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, multiRowMutationState) : serverTimeStamps[i++];
+ long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, valuesMap) : serverTimeStamps[i++];
Long scn = connection.getSCN();
long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
final PTable table = tableRef.getTable();
- Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll);
+ Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, mutationTimestamp, serverTimestamp, false, sendAll);
// build map from physical table to mutation list
boolean isDataTable = true;
while (mutationsIterator.hasNext()) {
@@ -952,7 +955,7 @@ public class MutationState implements SQLCloseable {
// involved in the transaction since none of them would have been
// committed in the event of a failure.
if (table.isTransactional()) {
- addUncommittedStatementIndexes(multiRowMutationState.values());
+ addUncommittedStatementIndexes(valuesMap.values());
if (txMutations.isEmpty()) {
txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
}
@@ -961,7 +964,7 @@ public class MutationState implements SQLCloseable {
// in the event that we need to replay the commit.
// Copy TableRef so we have the original PTable and know when the
// indexes have changed.
- joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations);
+ joinMutationState(new TableRef(tableRef), valuesMap, txMutations);
}
}
long serverTimestamp = HConstants.LATEST_TIMESTAMP;
@@ -1185,7 +1188,7 @@ public class MutationState implements SQLCloseable {
}
private int[] getUncommittedStatementIndexes() {
- for (MultiRowMutationState rowMutationMap : mutations.values()) {
+ for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) {
addUncommittedStatementIndexes(rowMutationMap.values());
}
return uncommittedStatementIndexes;
@@ -1218,7 +1221,7 @@ public class MutationState implements SQLCloseable {
}
public void commit() throws SQLException {
- Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
+ Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
int retryCount = 0;
do {
boolean sendSuccessful=false;
@@ -1428,46 +1431,6 @@ public class MutationState implements SQLCloseable {
}
}
- public static class MultiRowMutationState {
- private Map<ImmutableBytesPtr,RowMutationState> rowKeyToRowMutationState;
- private long estimatedSize;
-
- public MultiRowMutationState(int size) {
- this.rowKeyToRowMutationState = Maps.newHashMapWithExpectedSize(size);
- this.estimatedSize = 0;
- }
-
- public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) {
- estimatedSize += rowMutationState.calculateEstimatedSize();
- return rowKeyToRowMutationState.put(ptr, rowMutationState);
- }
-
- public void putAll(MultiRowMutationState other) {
- estimatedSize += other.estimatedSize;
- rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState);
- }
-
- public boolean isEmpty() {
- return rowKeyToRowMutationState.isEmpty();
- }
-
- public int size() {
- return rowKeyToRowMutationState.size();
- }
-
- public Set<Entry<ImmutableBytesPtr, RowMutationState>> entrySet() {
- return rowKeyToRowMutationState.entrySet();
- }
-
- public void clear(){
- rowKeyToRowMutationState.clear();
- }
-
- public Collection<RowMutationState> values() {
- return rowKeyToRowMutationState.values();
- }
- }
-
public static class RowMutationState {
@Nonnull private Map<PColumn,byte[]> columnValues;
private int[] statementIndexes;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 74f91b4..b23ea1b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -73,7 +73,7 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
+import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -296,7 +296,7 @@ public class IndexUtil {
}
public static List<Mutation> generateIndexData(final PTable table, PTable index,
- final MultiRowMutationState multiRowMutationState, List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection)
+ final Map<ImmutableBytesPtr, RowMutationState> valuesMap, List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection)
throws SQLException {
try {
final ImmutableBytesPtr ptr = new ImmutableBytesPtr();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
index df6a349..318c9d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -189,10 +188,10 @@ public class KeyValueUtil {
* @return estimated row size
*/
public static long
- getEstimatedRowMutationSize(Map<TableRef, MultiRowMutationState> tableMutationMap) {
+ getEstimatedRowMutationSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableMutationMap) {
long size = 0;
// iterate over table
- for (Entry<TableRef, MultiRowMutationState> tableEntry : tableMutationMap.entrySet()) {
+ for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : tableMutationMap.entrySet()) {
// iterate over rows
for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue().entrySet()) {
size += calculateRowMutationSize(rowEntry);