You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/02/03 02:21:56 UTC
[3/3] phoenix git commit: PHOENIX-3444 Make changes to
IndexMaintainer backward compatible
PHOENIX-3444 Make changes to IndexMaintainer backward compatible
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/42d04927
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/42d04927
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/42d04927
Branch: refs/heads/encodecolumns2
Commit: 42d049275a1c6e900d23edda7387e3d87ec3cb15
Parents: a4b58c0
Author: Samarth <sa...@salesforce.com>
Authored: Thu Feb 2 18:21:43 2017 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Thu Feb 2 18:21:43 2017 -0800
----------------------------------------------------------------------
.../apache/phoenix/cache/ServerCacheClient.java | 2 +
.../org/apache/phoenix/cache/TenantCache.java | 2 +-
.../apache/phoenix/cache/TenantCacheImpl.java | 4 +-
.../apache/phoenix/compile/DeleteCompiler.java | 2 +-
.../compile/PostLocalIndexDDLCompiler.java | 2 +-
.../apache/phoenix/compile/UpsertCompiler.java | 2 +-
.../coprocessor/BaseScannerRegionObserver.java | 5 +
.../GroupedAggregateRegionObserver.java | 9 +-
.../coprocessor/MetaDataEndpointImpl.java | 5 +-
.../coprocessor/MetaDataRegionObserver.java | 2 +-
.../phoenix/coprocessor/ScanRegionObserver.java | 9 +-
.../coprocessor/ServerCachingEndpointImpl.java | 2 +-
.../coprocessor/ServerCachingProtocol.java | 2 +-
.../UngroupedAggregateRegionObserver.java | 29 +-
.../generated/ServerCachingProtos.java | 5125 +++++++++++++++++-
.../apache/phoenix/execute/BaseQueryPlan.java | 2 +-
.../apache/phoenix/execute/MutationState.java | 2 +-
.../apache/phoenix/index/IndexMaintainer.java | 446 +-
.../phoenix/index/IndexMetaDataCacheClient.java | 1 +
.../index/IndexMetaDataCacheFactory.java | 4 +-
.../apache/phoenix/index/PhoenixIndexCodec.java | 1 +
.../phoenix/index/PhoenixIndexMetaData.java | 9 +-
.../apache/phoenix/join/HashCacheFactory.java | 2 +-
.../apache/phoenix/schema/MetaDataClient.java | 12 +-
.../org/apache/phoenix/schema/PColumnImpl.java | 5 +
.../tuple/EncodedColumnQualiferCellsList.java | 5 +
.../apache/phoenix/util/EncodedColumnsUtil.java | 21 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 5 +
.../apache/phoenix/cache/TenantCacheTest.java | 6 +-
.../phoenix/index/IndexMaintainerTest.java | 4 +-
.../src/main/ServerCachingService.proto | 35 +
31 files changed, 5541 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 0383251..18e4034 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCac
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService;
+import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
@@ -215,6 +216,7 @@ public class ServerCacheClient {
}
builder.setCacheId(ByteStringer.wrap(cacheId));
builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr));
+ builder.setHasProtoBufIndexMaintainer(true);
ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder();
svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
builder.setCacheFactory(svrCacheFactoryBuider.build());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index 5c33967..d30f5dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager;
public interface TenantCache {
MemoryManager getMemoryManager();
Closeable getServerCache(ImmutableBytesPtr cacheId);
- Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException;
+ Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer) throws SQLException;
void removeServerCache(ImmutableBytesPtr cacheId);
void removeAllServerCache();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index 658b4cc..3d178f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -104,11 +104,11 @@ public class TenantCacheImpl implements TenantCache {
}
@Override
- public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException {
+ public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer) throws SQLException {
MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length);
boolean success = false;
try {
- Closeable element = cacheFactory.newCache(cachePtr, txState, chunk);
+ Closeable element = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer);
getServerCaches().put(cacheId, element);
success = true;
return element;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 602cd6b..cee545a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -585,7 +585,7 @@ public class DeleteCompiler {
if (ptr.getLength() > 0) {
byte[] uuidValue = ServerCacheClient.generateId();
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- context.getScan().setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get());
+ context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
}
ResultIterator iterator = aggPlan.iterator();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
index 843ed68..7e3c3b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
@@ -78,7 +78,7 @@ public class PostLocalIndexDDLCompiler {
// rows per region as a result. The value of the attribute will be our persisted
// index maintainers.
// Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver
- scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr));
// By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
// However, in this case, we need to project all of the data columns that contribute to the index.
IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index fbe07df..27b72a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -734,7 +734,7 @@ public class UpsertCompiler {
if (ptr.getLength() > 0) {
byte[] uuidValue = ServerCacheClient.generateId();
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- scan.setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get());
+ scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
}
ResultIterator iterator = aggPlan.iterator();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 06e4f53..1c479c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -84,6 +84,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String GROUP_BY_LIMIT = "_GroupByLimit";
public static final String LOCAL_INDEX = "_LocalIndex";
public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
+ /*
+ * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
+ * Needed for backward compatibility purposes. TODO: get rid of this in next major release.
+ */
+ public static final String LOCAL_INDEX_BUILD_PROTO = "_LocalIndexBuild";
public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema";
public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin";
public static final String COLUMNS_STORED_IN_SINGLE_CELL = "_ColumnsStoredInSingleCell";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 658bb82..9e82749 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -128,8 +128,13 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
RegionScanner innerScanner = s;
- byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
- List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
+ boolean useProto = false;
+ byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+ useProto = localIndexBytes != null;
+ if (localIndexBytes == null) {
+ localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+ }
+ List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
TupleProjector tupleProjector = null;
byte[][] viewConstants = null;
ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 84de00d..3e882e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -160,6 +160,7 @@ import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -1472,7 +1473,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// tableMetadata and set the view statement and partition column correctly
if (parentTable!=null && parentTable.getAutoPartitionSeqName()!=null) {
long autoPartitionNum = 1;
- final Properties props = new Properties();
try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
Statement stmt = connection.createStatement()) {
String seqName = parentTable.getAutoPartitionSeqName();
@@ -3286,8 +3286,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index
.getTableName().getBytes());
Pair<String, String> columnToDeleteInfo = new Pair<>(columnToDelete.getFamilyName().getString(), columnToDelete.getName().getString());
+ ColumnReference colDropRef = new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete.getColumnQualifierBytes());
boolean isColumnIndexed = indexMaintainer.getIndexedColumnInfo().contains(columnToDeleteInfo);
- boolean isCoveredColumn = indexMaintainer.getCoveredColumnInfo().contains(columnToDeleteInfo);
+ boolean isCoveredColumn = indexMaintainer.getCoveredColumns().contains(colDropRef);
// If index requires this column for its pk, then drop it
if (isColumnIndexed) {
// Since we're dropping the index, lock it to ensure
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 7b98edb..5620d6a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -363,7 +363,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
conn);
byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
- dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
MutationState mutationState = plan.execute();
long rowCount = mutationState.getUpdateCount();
LOG.info(rowCount + " rows of index which are rebuild");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index ad5c84c..74d1dd3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -214,8 +214,13 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
if (dataColumns != null) {
tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
dataRegion = c.getEnvironment().getRegion();
- byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
- List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
+ boolean useProto = false;
+ byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+ useProto = localIndexBytes != null;
+ if (localIndexBytes == null) {
+ localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+ }
+ List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
indexMaintainer = indexMaintainers.get(0);
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index bf889d5..98f57ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -72,7 +72,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C
(Class<ServerCacheFactory>) Class.forName(request.getCacheFactory().getClassName());
ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance();
tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()),
- cachePtr, txState, cacheFactory);
+ cachePtr, txState, cacheFactory, request.hasHasProtoBufIndexMaintainer() && request.getHasProtoBufIndexMaintainer());
} catch (Throwable e) {
ProtobufUtil.setControllerException(controller, new IOException(e));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
index b201c8e..139a69c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
@@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
*/
public interface ServerCachingProtocol {
public static interface ServerCacheFactory extends Writable {
- public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException;
+ public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException;
}
/**
* Add the cache to the region server cache.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 6289f5a..b6a0a6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -184,7 +184,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
byte[] indexMaintainersPtr, byte[] txState) throws IOException {
if (indexMaintainersPtr != null) {
- mutations.get(0).setAttribute(PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
+ mutations.get(0).setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, indexMaintainersPtr);
}
if (txState != null) {
@@ -294,8 +294,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
values = new byte[projectedTable.getPKColumns().size()][];
}
- byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
- List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
+ boolean useProto = false;
+ byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+ useProto = localIndexBytes != null;
+ if (localIndexBytes == null) {
+ localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+ }
+ List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
List<Mutation> indexMutations = localIndexBytes == null ? Collections.<Mutation>emptyList() : Lists.<Mutation>newArrayListWithExpectedSize(1024);
RegionScanner theScanner = s;
@@ -388,7 +393,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
long rowCount = 0;
final RegionScanner innerScanner = theScanner;
- byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+ // for backward compatiblity fall back to look by the old attribute
+ if (indexMaintainersPtr == null) {
+ indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ }
boolean acquiredLock = false;
try {
if(needToWrite) {
@@ -726,7 +735,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private RegionScanner rebuildIndices(final RegionScanner innerScanner, final HRegion region, final Scan scan,
Configuration config) throws IOException {
- byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+ boolean useProto = true;
+ // for backward compatibility fall back to look up by the old attribute
+ if (indexMetaData == null) {
+ useProto = false;
+ indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ }
boolean hasMore;
long rowCount = 0;
try {
@@ -746,7 +761,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
if (put == null) {
put = new Put(CellUtil.cloneRow(cell));
- put.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMetaData);
+ put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
PDataType.TRUE_BYTES);
@@ -756,7 +771,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} else {
if (del == null) {
del = new Delete(CellUtil.cloneRow(cell));
- del.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMetaData);
+ del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
PDataType.TRUE_BYTES);