You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/10/28 21:33:58 UTC
[05/35] phoenix git commit: PHOENIX-2935 IndexMetaData cache can
expire when a delete and or query running on server
PHOENIX-2935 IndexMetaData cache can expire when a delete and or query running on server
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d1156eaa
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d1156eaa
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d1156eaa
Branch: refs/heads/calcite
Commit: d1156eaadfc72d9bcfe6418fa98a170d27b18264
Parents: d507ee7
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed Oct 12 14:10:15 2016 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed Oct 12 14:10:15 2016 +0530
----------------------------------------------------------------------
.../apache/phoenix/compile/DeleteCompiler.java | 8 +--
.../apache/phoenix/compile/UpsertCompiler.java | 51 ++++++++------------
.../UngroupedAggregateRegionObserver.java | 22 ++++++---
.../phoenix/index/PhoenixIndexMetaData.java | 7 +--
4 files changed, 44 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1156eaa/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 42efd68..e0881cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -32,6 +32,7 @@ import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -45,7 +46,6 @@ import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -583,10 +583,10 @@ public class DeleteCompiler {
ServerCache cache = null;
try {
if (ptr.getLength() > 0) {
- IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
- byte[] uuidValue = cache.getId();
+ byte[] uuidValue = ServerCacheClient.generateId();
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ context.getScan().setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get());
+ context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
}
ResultIterator iterator = aggPlan.iterator();
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1156eaa/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 26855aa..3f9e6b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -33,14 +33,10 @@ import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.HRegionLocator;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -56,7 +52,6 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -87,14 +82,13 @@ import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.TypeMismatchException;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -731,32 +725,27 @@ public class UpsertCompiler {
table.getIndexMaintainers(ptr, context.getConnection());
byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
- ServerCache cache = null;
+ if (ptr.getLength() > 0) {
+ byte[] uuidValue = ServerCacheClient.generateId();
+ scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ scan.setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get());
+ scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
+ ResultIterator iterator = aggPlan.iterator();
try {
- if (ptr.getLength() > 0) {
- IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
- byte[] uuidValue = cache.getId();
- scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- }
- ResultIterator iterator = aggPlan.iterator();
- try {
- Tuple row = iterator.next();
- final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
- return new MutationState(maxSize, connection) {
- @Override
- public long getUpdateCount() {
- return mutationCount;
- }
- };
- } finally {
- iterator.close();
- }
+ Tuple row = iterator.next();
+ final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row,
+ PLong.INSTANCE, ptr);
+ return new MutationState(maxSize, connection) {
+ @Override
+ public long getUpdateCount() {
+ return mutationCount;
+ }
+ };
} finally {
- if (cache != null) {
- cache.close();
- }
+ iterator.close();
}
+
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1156eaa/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d8d313d..0f175c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -177,8 +177,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
}
- private void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID,
- long blockingMemstoreSize) throws IOException {
+ private void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
+ byte[] indexMaintainersPtr, byte[] txState) throws IOException {
+ if (indexMaintainersPtr != null) {
+ mutations.get(0).setAttribute(PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
+ }
+
+ if (txState != null) {
+ mutations.get(0).setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
if (indexUUID != null) {
for (Mutation m : mutations) {
m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
@@ -291,6 +298,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
RegionScanner theScanner = s;
byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+ byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
List<Expression> selectExpressions = null;
byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
boolean isUpsert = false;
@@ -373,6 +381,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
long rowCount = 0;
final RegionScanner innerScanner = theScanner;
+ byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
boolean acquiredLock = false;
try {
if(needToWrite) {
@@ -596,13 +605,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
if (!mutations.isEmpty() && batchSize > 0 &&
mutations.size() % batchSize == 0) {
- commitBatch(region, mutations, indexUUID, blockingMemStoreSize);
+ commitBatch(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+ txState);
mutations.clear();
}
// Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
if (!indexMutations.isEmpty() && batchSize > 0 &&
indexMutations.size() % batchSize == 0) {
- commitBatch(region, indexMutations, null, blockingMemStoreSize);
+ commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState);
indexMutations.clear();
}
} catch (ConstraintViolationException e) {
@@ -618,11 +628,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
} while (hasMore);
if (!mutations.isEmpty()) {
- commitBatch(region,mutations, indexUUID, blockingMemStoreSize);
+ commitBatch(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState);
}
if (!indexMutations.isEmpty()) {
- commitBatch(region,indexMutations, null, blockingMemStoreSize);
+ commitBatch(region, indexMutations, null, blockingMemStoreSize, indexMaintainersPtr, txState);
indexMutations.clear();
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1156eaa/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 818713b..d22e957 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -74,9 +74,10 @@ public class PhoenixIndexMetaData implements IndexMetaData {
TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
if (indexCache == null) {
- String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
- SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND)
- .setMessage(msg).build().buildException();
+ String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion() + "host="
+ + env.getRegionServerServices().getServerName();
+ SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND).setMessage(msg)
+ .build().buildException();
ServerUtil.throwIOException("Index update failed", e); // will not return
}
return indexCache;