You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2017/11/14 04:30:35 UTC
[3/4] phoenix git commit: PHOENIX-4305 Make use of Cell interface
APIs where ever possible.(Rajeshbabu)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index c004818..68b36f5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -52,7 +52,7 @@ import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SequenceUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -84,7 +84,7 @@ public class SequenceRegionObserver implements RegionObserver {
byte[] errorCodeBuf = new byte[PInteger.INSTANCE.getByteSize()];
PInteger.INSTANCE.getCodec().encodeInt(errorCode, errorCodeBuf, 0);
return Result.create(Collections.singletonList(
- (Cell)KeyValueUtil.newKeyValue(row,
+ PhoenixKeyValueUtil.newKeyValue(row,
PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES,
QueryConstants.EMPTY_COLUMN_BYTES, timestamp, errorCodeBuf)));
}
@@ -139,9 +139,9 @@ public class SequenceRegionObserver implements RegionObserver {
}
- KeyValue currentValueKV = Sequence.getCurrentValueKV(result);
- KeyValue incrementByKV = Sequence.getIncrementByKV(result);
- KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result);
+ Cell currentValueKV = Sequence.getCurrentValueKV(result);
+ Cell incrementByKV = Sequence.getIncrementByKV(result);
+ Cell cacheSizeKV = Sequence.getCacheSizeKV(result);
long currentValue = PLong.INSTANCE.getCodec().decodeLong(currentValueKV.getValueArray(), currentValueKV.getValueOffset(), SortOrder.getDefault());
long incrementBy = PLong.INSTANCE.getCodec().decodeLong(incrementByKV.getValueArray(), incrementByKV.getValueOffset(), SortOrder.getDefault());
@@ -161,15 +161,15 @@ public class SequenceRegionObserver implements RegionObserver {
currentValue += incrementBy * cacheSize;
// Hold timestamp constant for sequences, so that clients always only see the latest value
// regardless of when they connect.
- KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp);
+ Cell newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp);
put.add(newCurrentValueKV);
Sequence.replaceCurrentValueKV(cells, newCurrentValueKV);
}
else {
- KeyValue cycleKV = Sequence.getCycleKV(result);
- KeyValue limitReachedKV = Sequence.getLimitReachedKV(result);
- KeyValue minValueKV = Sequence.getMinValueKV(result);
- KeyValue maxValueKV = Sequence.getMaxValueKV(result);
+ Cell cycleKV = Sequence.getCycleKV(result);
+ Cell limitReachedKV = Sequence.getLimitReachedKV(result);
+ Cell minValueKV = Sequence.getMinValueKV(result);
+ Cell maxValueKV = Sequence.getMaxValueKV(result);
boolean increasingSeq = incrementBy > 0 ? true : false;
@@ -179,7 +179,7 @@ public class SequenceRegionObserver implements RegionObserver {
boolean limitReached;
if (limitReachedKV == null) {
limitReached = false;
- KeyValue newLimitReachedKV = createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, limitReached, timestamp);
+ Cell newLimitReachedKV = createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, limitReached, timestamp);
put.add(newLimitReachedKV);
Sequence.replaceLimitReachedKV(cells, newLimitReachedKV);
}
@@ -190,7 +190,7 @@ public class SequenceRegionObserver implements RegionObserver {
long minValue;
if (minValueKV == null) {
minValue = Long.MIN_VALUE;
- KeyValue newMinValueKV = createKeyValue(row, PhoenixDatabaseMetaData.MIN_VALUE_BYTES, minValue, timestamp);
+ Cell newMinValueKV = createKeyValue(row, PhoenixDatabaseMetaData.MIN_VALUE_BYTES, minValue, timestamp);
put.add(newMinValueKV);
Sequence.replaceMinValueKV(cells, newMinValueKV);
}
@@ -201,7 +201,7 @@ public class SequenceRegionObserver implements RegionObserver {
long maxValue;
if (maxValueKV == null) {
maxValue = Long.MAX_VALUE;
- KeyValue newMaxValueKV = createKeyValue(row, PhoenixDatabaseMetaData.MAX_VALUE_BYTES, maxValue, timestamp);
+ Cell newMaxValueKV = createKeyValue(row, PhoenixDatabaseMetaData.MAX_VALUE_BYTES, maxValue, timestamp);
put.add(newMaxValueKV);
Sequence.replaceMaxValueKV(cells, newMaxValueKV);
}
@@ -212,7 +212,7 @@ public class SequenceRegionObserver implements RegionObserver {
boolean cycle;
if (cycleKV == null) {
cycle = false;
- KeyValue newCycleKV = createKeyValue(row, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, cycle, timestamp);
+ Cell newCycleKV = createKeyValue(row, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, cycle, timestamp);
put.add(newCycleKV);
Sequence.replaceCycleValueKV(cells, newCycleKV);
}
@@ -260,11 +260,11 @@ public class SequenceRegionObserver implements RegionObserver {
// update currentValue
currentValue += incrementBy * (SequenceUtil.isBulkAllocation(numSlotsToAllocate) ? numSlotsToAllocate : cacheSize);
// update the currentValue of the Result row
- KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp);
+ Cell newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp);
Sequence.replaceCurrentValueKV(cells, newCurrentValueKV);
put.add(newCurrentValueKV);
// set the LIMIT_REACHED column to true, so that no new values can be used
- KeyValue newLimitReachedKV = createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, limitReached, timestamp);
+ Cell newLimitReachedKV = createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, limitReached, timestamp);
put.add(newLimitReachedKV);
}
// update the KeyValues on the server
@@ -293,10 +293,10 @@ public class SequenceRegionObserver implements RegionObserver {
* column qualifier of KeyValue
* @return return the KeyValue that was created
*/
- KeyValue createKeyValue(byte[] key, byte[] cqBytes, long value, long timestamp) {
+ Cell createKeyValue(byte[] key, byte[] cqBytes, long value, long timestamp) {
byte[] valueBuffer = new byte[PLong.INSTANCE.getByteSize()];
- PLong.INSTANCE.getCodec().encodeLong(value, valueBuffer, 0);
- return KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, cqBytes, timestamp, valueBuffer);
+ PLong.INSTANCE.getCodec().encodeLong(value, valueBuffer, 0);
+ return PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, cqBytes, timestamp, valueBuffer);
}
/**
@@ -308,9 +308,9 @@ public class SequenceRegionObserver implements RegionObserver {
* column qualifier of KeyValue
* @return return the KeyValue that was created
*/
- private KeyValue createKeyValue(byte[] key, byte[] cqBytes, boolean value, long timestamp) throws IOException {
+ private Cell createKeyValue(byte[] key, byte[] cqBytes, boolean value, long timestamp) throws IOException {
// create new key value for put
- return KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, cqBytes,
+ return PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, cqBytes,
timestamp, value ? PDataType.TRUE_BYTES : PDataType.FALSE_BYTES);
}
@@ -397,7 +397,7 @@ public class SequenceRegionObserver implements RegionObserver {
// Timestamp should match exactly, or we may have the wrong sequence
if (expectedValue != value || currentValueKV.getTimestamp() != clientTimestamp) {
return Result.create(Collections.singletonList(
- (Cell)KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES,
+ (Cell)PhoenixKeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES,
QueryConstants.EMPTY_COLUMN_BYTES, currentValueKV.getTimestamp(), ByteUtil.EMPTY_BYTE_ARRAY)));
}
m = new Put(row, currentValueKV.getTimestamp());
@@ -425,7 +425,7 @@ public class SequenceRegionObserver implements RegionObserver {
// the client cares about is the timestamp, which is the timestamp of
// when the mutation was actually performed (useful in the case of .
return Result.create(Collections.singletonList(
- (Cell)KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE)));
+ (Cell)PhoenixKeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE)));
} finally {
ServerUtil.releaseRowLocks(locks);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index ab6309c..82bfc07 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -131,7 +131,7 @@ import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
@@ -332,7 +332,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public boolean add(Mutation e) {
boolean r = super.add(e);
if (r) {
- this.byteSize += KeyValueUtil.calculateMutationDiskSize(e);
+ this.byteSize += PhoenixKeyValueUtil.calculateMutationDiskSize(e);
}
return r;
}
@@ -797,12 +797,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
final boolean hadAny = hasAny;
- KeyValue keyValue = null;
+ Cell keyValue = null;
if (hadAny) {
byte[] value = aggregators.toBytes(rowAggregators);
- keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+ keyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
}
- final KeyValue aggKeyValue = keyValue;
+ final Cell aggKeyValue = keyValue;
RegionScanner scanner = new BaseRegionScanner(innerScanner) {
private boolean done = !hadAny;
@@ -1096,7 +1096,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
region.closeRegionOperation();
}
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
- final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+ final Cell aggKeyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
RegionScanner scanner = new BaseRegionScanner(innerScanner) {
@@ -1154,8 +1154,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
+ region.getRegionInfo().getRegionNameAsString());
}
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
- final KeyValue aggKeyValue =
- KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+ final Cell aggKeyValue =
+ PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
RegionScanner scanner = new BaseRegionScanner(innerScanner) {
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index b0974c6..04ed864 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -90,7 +90,7 @@ import org.apache.phoenix.transaction.PhoenixTransactionalTable;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SQLCloseable;
@@ -355,7 +355,7 @@ public class MutationState implements SQLCloseable {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build()
.buildException();
}
- long estimatedSize = KeyValueUtil.getEstimatedRowSize(mutations);
+ long estimatedSize = PhoenixKeyValueUtil.getEstimatedRowSize(mutations);
if (estimatedSize > maxSizeBytes) {
resetState();
throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED)
@@ -746,7 +746,7 @@ public class MutationState implements SQLCloseable {
long byteSize = 0;
if (GlobalClientMetrics.isMetricsEnabled()) {
for (Mutation mutation : mutations) {
- byteSize += KeyValueUtil.calculateMutationDiskSize(mutation);
+ byteSize += PhoenixKeyValueUtil.calculateMutationDiskSize(mutation);
}
}
GLOBAL_MUTATION_BYTES.update(byteSize);
@@ -891,7 +891,6 @@ public class MutationState implements SQLCloseable {
}
- @SuppressWarnings("deprecation")
private void send(Iterator<TableRef> tableRefIterator) throws SQLException {
int i = 0;
long[] serverTimeStamps = null;
@@ -1085,7 +1084,7 @@ public class MutationState implements SQLCloseable {
List<Mutation> currentList = Lists.newArrayList();
long currentBatchSizeBytes = 0L;
for (Mutation mutation : allMutationList) {
- long mutationSizeBytes = KeyValueUtil.calculateMutationDiskSize(mutation);
+ long mutationSizeBytes = PhoenixKeyValueUtil.calculateMutationDiskSize(mutation);
if (currentList.size() == batchSize || currentBatchSizeBytes + mutationSizeBytes > batchSizeBytes) {
if (currentList.size() > 0) {
mutationBatchList.add(currentList);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index fab7c59..395a699 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -33,7 +33,6 @@ import java.util.Queue;
import java.util.Set;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -67,6 +66,7 @@ import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.ResultUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -688,14 +688,14 @@ public class SortMergeJoinPlan implements QueryPlan {
@Override
protected int sizeOf(Tuple e) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0));
+ KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
return Bytes.SIZEOF_INT * 2 + kv.getLength();
}
@SuppressWarnings("deprecation")
@Override
protected void writeToBuffer(MappedByteBuffer buffer, Tuple e) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0));
+ KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
buffer.putInt(kv.getLength() + Bytes.SIZEOF_INT);
buffer.putInt(kv.getLength());
buffer.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
index 266bb6e..753c11d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -30,7 +30,6 @@ import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -48,7 +47,7 @@ import org.apache.phoenix.schema.ProjectedColumn;
import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.tuple.BaseTuple;
import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.SchemaUtil;
import com.google.common.base.Preconditions;
@@ -171,7 +170,7 @@ public class TupleProjector {
long timestamp;
ImmutableBytesWritable projectedValue = new ImmutableBytesWritable();
int bitSetLen;
- KeyValue keyValue;
+ Cell keyValue;
public ProjectedValueTuple(Tuple keyBase, long timestamp, byte[] projectedValue, int valueOffset, int valueLength, int bitSetLen) {
keyBase.getKey(this.keyPtr);
@@ -209,7 +208,7 @@ public class TupleProjector {
}
@Override
- public KeyValue getValue(int index) {
+ public Cell getValue(int index) {
if (index != 0) {
throw new IndexOutOfBoundsException(Integer.toString(index));
}
@@ -217,9 +216,9 @@ public class TupleProjector {
}
@Override
- public KeyValue getValue(byte[] family, byte[] qualifier) {
+ public Cell getValue(byte[] family, byte[] qualifier) {
if (keyValue == null) {
- keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(),
+ keyValue = PhoenixKeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(),
VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue.get(), projectedValue.getOffset(), projectedValue.getLength());
}
return keyValue;
@@ -256,15 +255,15 @@ public class TupleProjector {
}
@Override
- public KeyValue getValue(int index) {
+ public Cell getValue(int index) {
if (index != 0) { throw new IndexOutOfBoundsException(Integer.toString(index)); }
return getValue(VALUE_COLUMN_FAMILY, OLD_VALUE_COLUMN_QUALIFIER);
}
@Override
- public KeyValue getValue(byte[] family, byte[] qualifier) {
+ public Cell getValue(byte[] family, byte[] qualifier) {
if (keyValue == null) {
- keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(),
+ keyValue = PhoenixKeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(),
VALUE_COLUMN_FAMILY, OLD_VALUE_COLUMN_QUALIFIER, timestamp, projectedValue.get(),
projectedValue.getOffset(), projectedValue.getLength());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
index 1280cb5..e30a6eb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
@@ -22,7 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -113,7 +113,7 @@ public class DistinctPrefixFilter extends FilterBase implements Writable {
}
}
}
- return KeyValue.createFirstOnRow(tmp.get(), tmp.getOffset(), tmp.getLength(), null, 0, 0,
+ return KeyValueUtil.createFirstOnRow(tmp.get(), tmp.getOffset(), tmp.getLength(), null, 0, 0,
null, 0, 0);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index a2edd45..ee7f074 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -17,7 +17,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -115,7 +115,7 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
}
@Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData context)
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<Cell> filtered, IndexMetaData context)
throws IOException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index a00294c..489c40e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -22,7 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Increment;
@@ -96,7 +96,7 @@ public interface IndexBuilder extends Stoppable {
* @throws IOException on failure
*/
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
- Collection<KeyValue> filtered, IndexMetaData context)
+ Collection<Cell> filtered, IndexMetaData context)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
index e707ea2..722d64c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.hbase.index.covered;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
/**
@@ -29,7 +30,7 @@ public class Batch {
private static final long pointDeleteCode = KeyValue.Type.Delete.getCode();
private final long timestamp;
- private List<KeyValue> batch = new ArrayList<KeyValue>();
+ private List<Cell> batch = new ArrayList<Cell>();
private boolean allPointDeletes = true;
/**
@@ -39,8 +40,8 @@ public class Batch {
this.timestamp = ts;
}
- public void add(KeyValue kv){
- if (pointDeleteCode != kv.getType()) {
+ public void add(Cell kv){
+ if (pointDeleteCode != kv.getTypeByte()) {
allPointDeletes = false;
}
batch.add(kv);
@@ -54,7 +55,7 @@ public class Batch {
return this.timestamp;
}
- public List<KeyValue> getKvs() {
+ public List<Cell> getKvs() {
return this.batch;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/KeyValueStore.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/KeyValueStore.java
index 30d2904..0848e29 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/KeyValueStore.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/KeyValueStore.java
@@ -17,7 +17,7 @@
*/
package org.apache.phoenix.hbase.index.covered;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.phoenix.hbase.index.scanner.ReseekableScanner;
/**
@@ -25,9 +25,9 @@ import org.apache.phoenix.hbase.index.scanner.ReseekableScanner;
*/
public interface KeyValueStore {
- public void add(KeyValue kv, boolean overwrite);
+ public void add(Cell kv, boolean overwrite);
public ReseekableScanner getScanner();
- public void rollback(KeyValue kv);
+ public void rollback(Cell kv);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index f7784e5..f89a896 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -20,7 +20,6 @@ import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
@@ -51,7 +50,7 @@ public class LocalTableState implements TableState {
private Mutation update;
private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
private ScannerBuilder scannerBuilder;
- private List<KeyValue> kvs = new ArrayList<KeyValue>();
+ private List<Cell> kvs = new ArrayList<Cell>();
private List<? extends IndexedColumnGroup> hints;
private CoveredColumns columnSet;
@@ -64,24 +63,24 @@ public class LocalTableState implements TableState {
this.columnSet = new CoveredColumns();
}
- public void addPendingUpdates(KeyValue... kvs) {
+ public void addPendingUpdates(Cell... kvs) {
if (kvs == null) return;
addPendingUpdates(Arrays.asList(kvs));
}
- public void addPendingUpdates(List<KeyValue> kvs) {
+ public void addPendingUpdates(List<Cell> kvs) {
if (kvs == null) return;
setPendingUpdates(kvs);
addUpdate(kvs);
}
- private void addUpdate(List<KeyValue> list) {
+ private void addUpdate(List<Cell> list) {
addUpdate(list, true);
}
- private void addUpdate(List<KeyValue> list, boolean overwrite) {
+ private void addUpdate(List<Cell> list, boolean overwrite) {
if (list == null) return;
- for (KeyValue kv : list) {
+ for (Cell kv : list) {
this.memstore.add(kv, overwrite);
}
}
@@ -90,20 +89,10 @@ public class LocalTableState implements TableState {
if (list == null) return;
// Avoid a copy of the Cell into a KeyValue if it's already a KeyValue
for (Cell c : list) {
- this.memstore.add(maybeCopyCell(c), overwrite);
+ this.memstore.add(c, overwrite);
}
}
- private KeyValue maybeCopyCell(Cell c) {
- // Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something
- // that will likely be removed at some point in time.
- if (c == null) return null;
- if (c instanceof KeyValue) {
- return (KeyValue) c;
- }
- return KeyValueUtil.copyToNewKeyValue(c);
- }
-
@Override
public RegionCoprocessorEnvironment getEnvironment() {
return this.env;
@@ -240,7 +229,7 @@ public class LocalTableState implements TableState {
}
@Override
- public Collection<KeyValue> getPendingUpdate() {
+ public Collection<Cell> getPendingUpdate() {
return this.kvs;
}
@@ -251,7 +240,7 @@ public class LocalTableState implements TableState {
* @param update
* pending {@link KeyValue}s
*/
- public void setPendingUpdates(Collection<KeyValue> update) {
+ public void setPendingUpdates(Collection<Cell> update) {
this.kvs.clear();
this.kvs.addAll(update);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 8dd57c0..4adc7b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -85,8 +85,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
long ts = m.getFamilyCellMap().values().iterator().next().iterator().next().getTimestamp();
Batch batch = new Batch(ts);
for (List<Cell> family : m.getFamilyCellMap().values()) {
- List<KeyValue> kvs = KeyValueUtil.ensureKeyValues(family);
- for (KeyValue kv : kvs) {
+ for (Cell kv : family) {
batch.add(kv);
if(ts != kv.getTimestamp()) {
throw new IllegalStateException("Time stamps must match for all cells in a batch");
@@ -256,7 +255,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
}
@Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData indexMetaData)
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<Cell> filtered, IndexMetaData indexMetaData)
throws IOException {
// TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
return null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index f85de59..f520673 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
@@ -84,5 +84,5 @@ public interface TableState {
* Can be used to help the codec to determine which columns it should attempt to index.
* @return the keyvalues in the pending update to the table.
*/
- Collection<KeyValue> getPendingUpdate();
+ Collection<Cell> getPendingUpdate();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/IndexMemStore.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/IndexMemStore.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/IndexMemStore.java
index 0fc9e14..dfd3774 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/IndexMemStore.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/IndexMemStore.java
@@ -17,16 +17,15 @@
*/
package org.apache.phoenix.hbase.index.covered.data;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.IndexKeyValueSkipListSet;
import org.apache.hadoop.hbase.regionserver.MemStore;
@@ -34,6 +33,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.covered.KeyValueStore;
import org.apache.phoenix.hbase.index.covered.LocalTableState;
import org.apache.phoenix.hbase.index.scanner.ReseekableScanner;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
/**
* Like the HBase {@link MemStore}, but without all that extra work around maintaining snapshots and
@@ -76,27 +76,10 @@ public class IndexMemStore implements KeyValueStore {
private static final Log LOG = LogFactory.getLog(IndexMemStore.class);
private IndexKeyValueSkipListSet kvset;
- private Comparator<KeyValue> comparator;
-
- /**
- * Compare two {@link KeyValue}s based only on their row keys. Similar to the standard
- * {@link KeyValue#COMPARATOR}, but doesn't take into consideration the memstore timestamps. We
- * instead manage which KeyValue to retain based on how its loaded here
- */
- public static final Comparator<KeyValue> COMPARATOR = new Comparator<KeyValue>() {
-
- private final KVComparator rawcomparator = new KVComparator();
-
- @Override
- public int compare(final KeyValue left, final KeyValue right) {
- return rawcomparator.compareFlatKey(left.getRowArray(), left.getOffset() + KeyValue.ROW_OFFSET,
- left.getKeyLength(), right.getRowArray(), right.getOffset() + KeyValue.ROW_OFFSET,
- right.getKeyLength());
- }
- };
+ private CellComparator comparator;
public IndexMemStore() {
- this(COMPARATOR);
+ this(CellComparatorImpl.COMPARATOR);
}
/**
@@ -106,13 +89,13 @@ public class IndexMemStore implements KeyValueStore {
* Exposed for subclassing/testing.
* @param comparator to use
*/
- IndexMemStore(Comparator<KeyValue> comparator) {
+ IndexMemStore(CellComparator comparator) {
this.comparator = comparator;
this.kvset = IndexKeyValueSkipListSet.create(comparator);
}
@Override
- public void add(KeyValue kv, boolean overwrite) {
+ public void add(Cell kv, boolean overwrite) {
if (LOG.isTraceEnabled()) {
LOG.trace("Inserting: " + toString(kv));
}
@@ -131,19 +114,19 @@ public class IndexMemStore implements KeyValueStore {
private void dump() {
LOG.trace("Current kv state:\n");
- for (KeyValue kv : this.kvset) {
+ for (Cell kv : this.kvset) {
LOG.trace("KV: " + toString(kv));
}
LOG.trace("========== END MemStore Dump ==================\n");
}
- private String toString(KeyValue kv) {
+ private String toString(Cell kv) {
return kv.toString() + "/value=" +
Bytes.toStringBinary(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
}
@Override
- public void rollback(KeyValue kv) {
+ public void rollback(Cell kv) {
if (LOG.isTraceEnabled()) {
LOG.trace("Rolling back: " + toString(kv));
}
@@ -169,13 +152,13 @@ public class IndexMemStore implements KeyValueStore {
// set, rather than a primary and a secondary set of KeyValues.
protected class MemStoreScanner implements ReseekableScanner {
// Next row information for the set
- private KeyValue nextRow = null;
+ private Cell nextRow = null;
// last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
- private KeyValue kvsetItRow = null;
+ private Cell kvsetItRow = null;
// iterator based scanning.
- private Iterator<KeyValue> kvsetIt;
+ private Iterator<Cell> kvsetIt;
// The kvset at the time of creating this scanner
volatile IndexKeyValueSkipListSet kvsetAtCreation;
@@ -185,12 +168,12 @@ public class IndexMemStore implements KeyValueStore {
kvsetAtCreation = kvset;
}
- private KeyValue getNext(Iterator<KeyValue> it) {
+ private Cell getNext(Iterator<Cell> it) {
// in the original implementation we cared about the current thread's readpoint from MVCC.
// However, we don't need to worry here because everything the index can see, is also visible
// to the client (or is the pending primary table update, so it will be once the index is
// written, so it might as well be).
- KeyValue v = null;
+ Cell v = null;
try {
while (it.hasNext()) {
v = it.next();
@@ -220,7 +203,7 @@ public class IndexMemStore implements KeyValueStore {
// kvset and snapshot will never be null.
// if tailSet can't find anything, SortedSet is empty (not null).
- kvsetIt = kvsetAtCreation.tailSet(KeyValueUtil.ensureKeyValue(key)).iterator();
+ kvsetIt = kvsetAtCreation.tailSet(PhoenixKeyValueUtil.maybeCopyCell(key)).iterator();
kvsetItRow = null;
return seekInSubLists();
@@ -250,7 +233,7 @@ public class IndexMemStore implements KeyValueStore {
* Unfortunately the Java API does not offer a method to get it. So we remember the last keys
* we iterated to and restore the reseeked set to at least that point.
*/
- kvsetIt = kvsetAtCreation.tailSet(getHighest(KeyValueUtil.ensureKeyValue(key), kvsetItRow)).iterator();
+ kvsetIt = kvsetAtCreation.tailSet(getHighest(PhoenixKeyValueUtil.maybeCopyCell(key), kvsetItRow)).iterator();
return seekInSubLists();
}
@@ -258,7 +241,7 @@ public class IndexMemStore implements KeyValueStore {
* Returns the higher of the two key values, or null if they are both null. This uses
* comparator.compare() to compare the KeyValue using the memstore comparator.
*/
- private KeyValue getHighest(KeyValue first, KeyValue second) {
+ private Cell getHighest(Cell first, Cell second) {
if (first == null && second == null) {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
index 9968627..5b06910 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.hbase.index.covered.data;
import java.io.IOException;
import java.util.Collection;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -40,7 +40,7 @@ public interface LocalHBaseState {
* @return the full state of the given row. Includes all current versions (even if they are not
* usually visible to the client (unless they are also doing a raw scan)). Never returns a
* <tt>null</tt> {@link Result} - instead, when there is not data for the row, returns a
- * {@link Result} with no stored {@link KeyValue}s.
+ * {@link Result} with no stored {@link Cell}s.
* @throws IOException if there is an issue reading the row
*/
public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
index a1f01ed..67049f5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
@@ -27,10 +27,10 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
/**
* Only allow the 'latest' timestamp of each family:qualifier pair, ensuring that they aren't
@@ -101,7 +101,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
@Override
public Cell getNextCellHint(Cell peeked){
- return currentHint.getHint(KeyValueUtil.ensureKeyValue(peeked));
+ return currentHint.getHint(PhoenixKeyValueUtil.maybeCopyCell(peeked));
}
@Override
@@ -111,7 +111,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
return ReturnCode.SKIP;
}
- KeyValue nextKV = KeyValueUtil.ensureKeyValue(next);
+ KeyValue nextKV = PhoenixKeyValueUtil.maybeCopyCell(next);
switch (KeyValue.Type.codeToType(next.getTypeByte())) {
/*
* DeleteFamily will always sort first because those KVs (we assume) don't have qualifiers (or
@@ -170,7 +170,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
* Get the next hint for a given peeked keyvalue
*/
interface Hinter {
- public abstract KeyValue getHint(KeyValue peek);
+ public abstract Cell getHint(Cell peek);
}
/**
@@ -181,10 +181,10 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
class DeleteFamilyHinter implements Hinter {
@Override
- public KeyValue getHint(KeyValue peeked) {
+ public Cell getHint(Cell peeked) {
// check to see if we have another column to seek
ImmutableBytesPtr nextFamily =
- getNextFamily(new ImmutableBytesPtr(peeked.getBuffer(), peeked.getFamilyOffset(),
+ getNextFamily(new ImmutableBytesPtr(peeked.getFamilyArray(), peeked.getFamilyOffset(),
peeked.getFamilyLength()));
if (nextFamily == null) {
// no known next family, so we can be completely done
@@ -192,8 +192,9 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
return KeyValue.LOWESTKEY;
}
// there is a valid family, so we should seek to that
- return KeyValue.createFirstOnRow(peeked.getRow(), nextFamily.copyBytesIfNecessary(),
- HConstants.EMPTY_BYTE_ARRAY);
+ return org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow(peeked.getRowArray(),
+ peeked.getRowOffset(), peeked.getRowLength(), nextFamily.get(),
+ nextFamily.getOffset(), nextFamily.getLength(), HConstants.EMPTY_BYTE_ARRAY, 0, 0);
}
}
@@ -205,8 +206,8 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
class DeleteColumnHinter implements Hinter {
@Override
- public KeyValue getHint(KeyValue kv) {
- return KeyValueUtil.createLastOnRow(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+ public Cell getHint(Cell kv) {
+ return org.apache.hadoop.hbase.KeyValueUtil.createLastOnRow(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
kv.getQualifierOffset(), kv.getQualifierLength());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/MaxTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/MaxTimestampFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/MaxTimestampFilter.java
index a8c7474..ec21946 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/MaxTimestampFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/MaxTimestampFilter.java
@@ -19,9 +19,9 @@ package org.apache.phoenix.hbase.index.covered.filter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
/**
* Inclusive filter on the maximum timestamp allowed. Excludes all elements greater than (but not
@@ -42,17 +42,12 @@ public class MaxTimestampFilter extends FilterBase {
// with other filters too much.
KeyValue kv = null;
try {
- kv = KeyValueUtil.ensureKeyValue(currentKV).clone();
+ kv = PhoenixKeyValueUtil.maybeCopyCell(currentKV).clone();
} catch (CloneNotSupportedException e) {
// the exception should not happen at all
throw new IllegalArgumentException(e);
}
- int offset =kv.getTimestampOffset();
- //set the timestamp in the buffer
- byte[] buffer = kv.getBuffer();
- byte[] ts = Bytes.toBytes(this.ts);
- System.arraycopy(ts, 0, buffer, offset, ts.length);
-
+ kv.setTimestamp(ts);
return kv;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnReference.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnReference.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnReference.java
index 00348b3..5aa1037 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnReference.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnReference.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.hbase.index.covered.update;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -139,7 +140,7 @@ public class ColumnReference implements Comparable<ColumnReference> {
}
public KeyValue getFirstKeyValueForRow(byte[] row) {
- return KeyValue.createFirstOnRow(row, getFamily(), getQualifier() == ALL_QUALIFIERS ? null
+ return KeyValueUtil.createFirstOnRow(row, getFamily(), getQualifier() == ALL_QUALIFIERS ? null
: getQualifier());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
index 072b624..6359d6c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
@@ -22,10 +22,10 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
import org.apache.phoenix.hbase.index.covered.KeyValueStore;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
/**
* Combine a simplified version of the logic in the ScanQueryMatcher and the KeyValueScanner. We can get away with this
@@ -94,7 +94,7 @@ public class FilteredKeyValueScanner implements ReseekableScanner {
break;
// use a seek hint to find out where we should go
case SEEK_NEXT_USING_HINT:
- delegate.seek(KeyValueUtil.ensureKeyValue(filter.getNextCellHint(peeked)));
+ delegate.seek(PhoenixKeyValueUtil.maybeCopyCell(filter.getNextCellHint(peeked)));
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/GenericKeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/GenericKeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/GenericKeyValueBuilder.java
index ebffde1..b2bfa0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/GenericKeyValueBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/GenericKeyValueBuilder.java
@@ -22,8 +22,9 @@ import static org.apache.phoenix.hbase.index.util.ImmutableBytesPtr.copyBytesIfN
import java.util.List;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
@@ -89,8 +90,8 @@ public class GenericKeyValueBuilder extends KeyValueBuilder {
}
@Override
- public KVComparator getKeyValueComparator() {
- return KeyValue.COMPARATOR;
+ public CellComparator getKeyValueComparator() {
+ return CellComparatorImpl.COMPARATOR;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index a4a34a1..2d65747 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -128,7 +128,10 @@ public class IndexManagementUtil {
boolean matches = false;
outer: for (KeyValue kv : update) {
for (ColumnReference ref : columns) {
- if (ref.matchesFamily(kv.getFamily()) && ref.matchesQualifier(kv.getQualifier())) {
+ if (ref.matchesFamily(kv.getFamilyArray(), kv.getFamilyOffset(),
+ kv.getFamilyLength())
+ && ref.matchesQualifier(kv.getQualifierArray(), kv.getQualifierOffset(),
+ kv.getQualifierLength())) {
matches = true;
// if a single column matches a single kv, we need to build a whole scanner
break outer;
@@ -150,7 +153,10 @@ public class IndexManagementUtil {
boolean matches = false;
outer: for (ColumnReference ref : columns) {
for (KeyValue kv : update) {
- if (ref.matchesFamily(kv.getFamily()) && ref.matchesQualifier(kv.getQualifier())) {
+ if (ref.matchesFamily(kv.getFamilyArray(), kv.getFamilyOffset(),
+ kv.getFamilyLength())
+ && ref.matchesQualifier(kv.getQualifierArray(), kv.getQualifierOffset(),
+ kv.getQualifierLength())) {
matches = true;
// if a single column matches a single kv, we need to build a whole scanner
break outer;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
index 9433abf..c6967cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -60,7 +60,7 @@ public abstract class KeyValueBuilder {
@SuppressWarnings("javadoc")
public static void deleteQuietly(Delete delete, KeyValueBuilder builder, KeyValue kv) {
try {
- delete.addDeleteMarker(kv);
+ delete.add(kv);
} catch (IOException e) {
throw new RuntimeException("KeyValue Builder " + builder + " created an invalid kv: "
+ kv + "!");
@@ -122,7 +122,7 @@ public abstract class KeyValueBuilder {
*/
public abstract void getValueAsPtr(Cell kv, ImmutableBytesWritable ptr);
- public abstract KVComparator getKeyValueComparator();
+ public abstract CellComparator getKeyValueComparator();
public abstract List<Mutation> cloneIfNecessary(List<Mutation> mutations);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
index b04cf0a..f2b3b98 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -66,14 +66,6 @@ public class IndexedKeyValue extends KeyValue {
return mutation;
}
- /*
- * Returns a faked column family for an IndexedKeyValue instance
- */
- @Override
- public byte [] getFamily() {
- return WALEdit.METAFAMILY;
- }
-
@Override
public byte[] getFamilyArray() {
return WALEdit.METAFAMILY;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java
index 682a504..d02d431 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java
@@ -25,9 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
/**
* Codec to encode/decode {@link KeyValue}s and {@link IndexedKeyValue}s within a {@link WALEdit}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index dc26d5a..887a04c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1136,10 +1136,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
* since we can build the corresponding index row key.
*/
public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ImmutableBytesWritable dataRowKeyPtr, long ts) throws IOException {
- return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<KeyValue>emptyList(), ts, null, null);
+ return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<Cell>emptyList(), ts, null, null);
}
- public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
+ public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey, ts);
// Delete the entire row if any of the indexed columns changed
DeleteType deleteType = null;
@@ -1169,7 +1169,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
// Delete columns for missing key values
for (Cell kv : pendingUpdates) {
if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
- ColumnReference ref = new ColumnReference(kv.getFamily(), kv.getQualifier());
+ ColumnReference ref =
+ new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(),
+ kv.getFamilyLength(), kv.getQualifierArray(),
+ kv.getQualifierOffset(), kv.getQualifierLength());
if (dataTableColRefs.contains(ref)) {
if (delete == null) {
delete = new Delete(indexRowKey);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 679c5df..5b76572 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -30,8 +30,8 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -221,7 +221,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
// ordered correctly). We only need the list sorted if the expressions are going to be
// executed, not when the outer loop is exited. Hence we do it here, at the top of the loop.
if (flattenedCells != null) {
- Collections.sort(flattenedCells,KeyValue.COMPARATOR);
+ Collections.sort(flattenedCells,CellComparatorImpl.COMPARATOR);
}
PRow row = table.newRow(GenericKeyValueBuilder.INSTANCE, ts, ptr, false);
int adjust = table.getBucketNum() == null ? 1 : 2;
@@ -272,7 +272,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
transferAttributes(inc, delete);
mutations.add(delete);
}
- delete.addDeleteMarker(cell);
+ delete.add(cell);
}
}
return mutations;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 9847205..4641a8d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -87,6 +87,7 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
import org.apache.phoenix.transaction.PhoenixTransactionalTable;
import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -517,7 +518,7 @@ public class PhoenixTransactionalIndexer implements RegionObserver, RegionCoproc
private final long currentTimestamp;
private final RegionCoprocessorEnvironment env;
private final Map<String, byte[]> attributes;
- private final List<KeyValue> pendingUpdates;
+ private final List<Cell> pendingUpdates;
private final Set<ColumnReference> indexedColumns;
private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
@@ -533,8 +534,7 @@ public class PhoenixTransactionalIndexer implements RegionObserver, RegionCoproc
try {
CellScanner scanner = mutation.cellScanner();
while (scanner.advance()) {
- Cell cell = scanner.current();
- pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell));
+ pendingUpdates.add(scanner.current());
}
} catch (IOException e) {
throw new RuntimeException(e); // Impossible
@@ -604,7 +604,7 @@ public class PhoenixTransactionalIndexer implements RegionObserver, RegionCoproc
}
@Override
- public Collection<KeyValue> getPendingUpdate() {
+ public Collection<Cell> getPendingUpdate() {
return pendingUpdates;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
index 84d29ff..1d88c9c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
@@ -24,12 +24,12 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import java.sql.SQLException;
import java.util.List;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
/**
*
@@ -57,7 +57,7 @@ public abstract class BaseGroupedAggregatingResultIterator implements
}
protected abstract ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException;
- protected abstract Tuple wrapKeyValueAsResult(KeyValue keyValue) throws SQLException;
+ protected abstract Tuple wrapKeyValueAsResult(Cell keyValue) throws SQLException;
@Override
public Tuple next() throws SQLException {
@@ -80,7 +80,7 @@ public abstract class BaseGroupedAggregatingResultIterator implements
}
byte[] value = aggregators.toBytes(rowAggregators);
- Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(currentKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+ Tuple tuple = wrapKeyValueAsResult(PhoenixKeyValueUtil.newKeyValue(currentKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
currentKey.set(nextKey.get(), nextKey.getOffset(), nextKey.getLength());
return tuple;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
index 1cf9b73..c0553fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
@@ -19,7 +19,7 @@ package org.apache.phoenix.iterate;
import java.sql.SQLException;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
@@ -58,7 +58,7 @@ public class GroupedAggregatingResultIterator extends BaseGroupedAggregatingResu
}
@Override
- protected Tuple wrapKeyValueAsResult(KeyValue keyValue) throws SQLException {
+ protected Tuple wrapKeyValueAsResult(Cell keyValue) throws SQLException {
return new SingleKeyValueTuple(keyValue);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
index ae2f452..529a0c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Queue;
+
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.ResultUtil;
import com.google.common.collect.MinMaxPriorityQueue;
@@ -82,7 +84,6 @@ public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEnt
return sizeof(e.sortKeys) + sizeof(toKeyValues(e));
}
- @SuppressWarnings("deprecation")
@Override
protected void writeToBuffer(MappedByteBuffer buffer, ResultEntry e) {
int totalLen = 0;
@@ -140,7 +141,7 @@ public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEnt
int size = result.size();
List<KeyValue> kvs = new ArrayList<KeyValue>(size);
for (int i = 0; i < size; i++) {
- kvs.add(org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(result.getValue(i)));
+ kvs.add(PhoenixKeyValueUtil.maybeCopyCell(result.getValue(i)));
}
return kvs;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index ded33cc..c78280d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -245,11 +245,11 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
try {
Tuple tuple = iterator.next();
if (tuple == null && !isLastScan) {
- List<KeyValue> kvList = new ArrayList<KeyValue>(1);
+ List<Cell> kvList = new ArrayList<Cell>(1);
KeyValue kv = new KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY,
QueryConstants.OFFSET_COLUMN, PInteger.INSTANCE.toBytes(iterator.getRemainingOffset()));
kvList.add(kv);
- Result r = new Result(kvList);
+ Result r = Result.create(kvList);
firstTuple = new ResultTuple(r);
} else {
firstTuple = tuple;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
index 3c52e51..bb4e83b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
@@ -29,7 +29,7 @@ import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -166,7 +166,7 @@ public class RowKeyOrderedAggregateResultIterator extends LookAheadResultIterato
current = previous;
} else {
byte[] value = aggregators.toBytes(rowAggregators);
- current = new SingleKeyValueTuple(KeyValueUtil.newKeyValue(previousKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+ current = new SingleKeyValueTuple(PhoenixKeyValueUtil.newKeyValue(previousKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
}
}
if (current == null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
index e3d0987..0bf5982 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
@@ -24,7 +24,7 @@ import java.sql.SQLException;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
public class UngroupedAggregatingResultIterator extends GroupedAggregatingResultIterator {
@@ -43,7 +43,7 @@ public class UngroupedAggregatingResultIterator extends GroupedAggregatingResult
aggregators.reset(aggregators.getAggregators());
byte[] value = aggregators.toBytes(aggregators.getAggregators());
result = new SingleKeyValueTuple(
- KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY,
+ PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY,
SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN,
AGG_TIMESTAMP,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index c34d20d..7ca178b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
@@ -66,7 +67,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
@@ -615,7 +616,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
List<Cell> newCells = Lists.newArrayListWithCapacity(cells.size() + 1);
newCells.addAll(cells);
newCells.add(kv);
- Collections.sort(newCells, KeyValue.COMPARATOR);
+ Collections.sort(newCells, CellComparatorImpl.COMPARATOR);
tuple = new ResultTuple(Result.create(newCells));
}
return tuple;
@@ -1051,7 +1052,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
PTableType.TABLE.getValue().getBytes(),
PTableType.VIEW.getValue().getBytes());
for (byte[] tableType : tableTypes) {
- TABLE_TYPE_TUPLES.add(new SingleKeyValueTuple(KeyValueUtil.newKeyValue(tableType, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES, MetaDataProtocol.MIN_TABLE_TIMESTAMP, ByteUtil.EMPTY_BYTE_ARRAY)));
+ TABLE_TYPE_TUPLES.add(new SingleKeyValueTuple(PhoenixKeyValueUtil.newKeyValue(tableType, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES, MetaDataProtocol.MIN_TABLE_TIMESTAMP, ByteUtil.EMPTY_BYTE_ARRAY)));
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d35cce1..6bbfd16 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -171,7 +171,7 @@ import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.CursorUtil;
-import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixContextExecutor;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -587,20 +587,20 @@ public class PhoenixStatement implements Statement, SQLCloseable {
for (String planStep : planSteps) {
byte[] row = PVarchar.INSTANCE.toBytes(planStep);
List<Cell> cells = Lists.newArrayListWithCapacity(3);
- cells.add(KeyValueUtil.newKeyValue(row, EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_COLUMN,
+ cells.add(PhoenixKeyValueUtil.newKeyValue(row, EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_COLUMN,
MetaDataProtocol.MIN_TABLE_TIMESTAMP, ByteUtil.EMPTY_BYTE_ARRAY));
if (estimatedBytesToScan != null) {
- cells.add(KeyValueUtil.newKeyValue(row, EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_BYTES_ESTIMATE,
+ cells.add(PhoenixKeyValueUtil.newKeyValue(row, EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_BYTES_ESTIMATE,
MetaDataProtocol.MIN_TABLE_TIMESTAMP,
PLong.INSTANCE.toBytes(estimatedBytesToScan)));
}
if (estimatedRowsToScan != null) {
- cells.add(KeyValueUtil.newKeyValue(row, EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_ROWS_ESTIMATE,
+ cells.add(PhoenixKeyValueUtil.newKeyValue(row, EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_ROWS_ESTIMATE,
MetaDataProtocol.MIN_TABLE_TIMESTAMP,
PLong.INSTANCE.toBytes(estimatedRowsToScan)));
}
if (estimateInfoTimestamp != null) {
- cells.add(KeyValueUtil.newKeyValue(row, EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_ESTIMATE_INFO_TS,
+ cells.add(PhoenixKeyValueUtil.newKeyValue(row, EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_ESTIMATE_INFO_TS,
MetaDataProtocol.MIN_TABLE_TIMESTAMP,
PLong.INSTANCE.toBytes(estimateInfoTimestamp)));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index 360859e..3925bdb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -172,31 +172,31 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
return;
}
upsertExecutor.execute(ImmutableList.<RECORD>of(record));
- Map<Integer, List<KeyValue>> map = new HashMap<>();
- Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
+ Map<Integer, List<Cell>> map = new HashMap<>();
+ Iterator<Pair<byte[], List<Cell>>> uncommittedDataIterator
= PhoenixRuntime.getUncommittedDataIterator(conn, true);
while (uncommittedDataIterator.hasNext()) {
- Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
- List<KeyValue> keyValueList = kvPair.getSecond();
+ Pair<byte[], List<Cell>> kvPair = uncommittedDataIterator.next();
+ List<Cell> keyValueList = kvPair.getSecond();
keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
byte[] first = kvPair.getFirst();
// Create a list of KV for each table
for (int i = 0; i < tableNames.size(); i++) {
if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
if (!map.containsKey(i)) {
- map.put(i, new ArrayList<KeyValue>());
+ map.put(i, new ArrayList<Cell>());
}
- List<KeyValue> list = map.get(i);
- for (KeyValue kv : keyValueList) {
+ List<Cell> list = map.get(i);
+ for (Cell kv : keyValueList) {
list.add(kv);
}
break;
}
}
}
- for (Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) {
+ for (Map.Entry<Integer, List<Cell>> rowEntry : map.entrySet()) {
int tableIndex = rowEntry.getKey();
- List<KeyValue> lkv = rowEntry.getValue();
+ List<Cell> lkv = rowEntry.getValue();
// All KV values combines to a single byte array
writeAggregatedRow(context, tableNames.get(tableIndex), lkv);
}
@@ -281,13 +281,13 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
* @throws InterruptedException
*/
- private void writeAggregatedRow(Context context, String tableName, List<KeyValue> lkv)
+ private void writeAggregatedRow(Context context, String tableName, List<Cell> lkv)
throws IOException, InterruptedException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
DataOutputStream outputStream = new DataOutputStream(bos);
ImmutableBytesWritable outputKey =null;
if (!lkv.isEmpty()) {
- for (KeyValue cell : lkv) {
+ for (Cell cell : lkv) {
if (outputKey == null || Bytes.compareTo(outputKey.get(), outputKey.getOffset(),
outputKey.getLength(), cell.getRowArray(), cell.getRowOffset(), cell
.getRowLength()) != 0) {
@@ -413,7 +413,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
ImportPreUpsertKeyValueProcessor {
@Override
- public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
+ public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
return keyValues;
}
}