You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/03/25 19:57:46 UTC
phoenix git commit: Push transaction state to server for secondary
indexing
Repository: phoenix
Updated Branches:
refs/heads/txn 0d44dd806 -> 826ebf5ce
Push transaction state to server for secondary indexing
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/826ebf5c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/826ebf5c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/826ebf5c
Branch: refs/heads/txn
Commit: 826ebf5ce741342ed1c758e594ca01bc9cb8e036
Parents: 0d44dd8
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Mar 25 11:54:44 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Mar 25 11:55:11 2015 -0700
----------------------------------------------------------------------
.../phoenix/cache/IndexMetaDataCache.java | 22 ++++
.../apache/phoenix/cache/ServerCacheClient.java | 3 +-
.../org/apache/phoenix/cache/TenantCache.java | 2 +-
.../apache/phoenix/cache/TenantCacheImpl.java | 13 +-
.../apache/phoenix/compile/DeleteCompiler.java | 8 +-
.../apache/phoenix/compile/UpsertCompiler.java | 8 +-
.../coprocessor/BaseScannerRegionObserver.java | 14 ++-
.../phoenix/coprocessor/ScanRegionObserver.java | 11 +-
.../coprocessor/ServerCachingEndpointImpl.java | 8 +-
.../coprocessor/ServerCachingProtocol.java | 5 +-
.../generated/ServerCachingProtos.java | 120 +++++++++++++++++--
.../apache/phoenix/execute/BaseQueryPlan.java | 17 ++-
.../apache/phoenix/execute/MutationState.java | 5 +-
.../hbase/index/builder/IndexBuilder.java | 9 ++
.../hbase/index/scanner/ScannerBuilder.java | 5 +-
.../phoenix/index/IndexMetaDataCacheClient.java | 10 +-
.../index/IndexMetaDataCacheFactory.java | 18 ++-
.../phoenix/index/PhoenixIndexBuilder.java | 2 +-
.../apache/phoenix/index/PhoenixIndexCodec.java | 53 ++++++--
.../apache/phoenix/jdbc/PhoenixConnection.java | 4 +
.../apache/phoenix/join/HashCacheClient.java | 3 +-
.../apache/phoenix/join/HashCacheFactory.java | 2 +-
.../apache/phoenix/util/TransactionUtil.java | 8 +-
.../src/main/ServerCachingService.proto | 1 +
24 files changed, 278 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index 48949f1..359e08f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -19,10 +19,32 @@
package org.apache.phoenix.cache;
import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
import java.util.List;
+import co.cask.tephra.Transaction;
+
import org.apache.phoenix.index.IndexMaintainer;
public interface IndexMetaDataCache extends Closeable {
+ public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new IndexMetaDataCache() {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public List<IndexMaintainer> getIndexMaintainers() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Transaction getTransaction() {
+ return null;
+ }
+
+ };
public List<IndexMaintainer> getIndexMaintainers();
+ public Transaction getTransaction();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 1233e1c..76bedbc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -142,7 +142,7 @@ public class ServerCacheClient {
}
- public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
+ public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
ConnectionQueryServices services = connection.getQueryServices();
MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
List<Closeable> closeables = new ArrayList<Closeable>();
@@ -201,6 +201,7 @@ public class ServerCacheClient {
ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder();
svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
builder.setCacheFactory(svrCacheFactoryBuider.build());
+ builder.setTxState(HBaseZeroCopyByteString.wrap(txState));
instance.addServerCache(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index b968a9b..c7cd58f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -37,6 +37,6 @@ import org.apache.phoenix.memory.MemoryManager;
public interface TenantCache {
MemoryManager getMemoryManager();
Closeable getServerCache(ImmutableBytesPtr cacheId);
- Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+ Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException;
void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index 9005fa8..6ef7a6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -23,14 +23,17 @@ import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import com.google.common.cache.*;
import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.util.Closeables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
/**
*
* Cache per tenant on server side. Tracks memory usage for each
@@ -80,11 +83,11 @@ public class TenantCacheImpl implements TenantCache {
}
@Override
- public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
- MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength());
+ public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException {
+ MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length);
boolean success = false;
try {
- Closeable element = cacheFactory.newCache(cachePtr, chunk);
+ Closeable element = cacheFactory.newCache(cachePtr, txState, chunk);
getServerCaches().put(cacheId, element);
success = true;
return element;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 273024c..a0369d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -75,8 +75,10 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.TransactionUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -490,12 +492,14 @@ public class DeleteCompiler {
public MutationState execute() throws SQLException {
// TODO: share this block of code with UPSERT SELECT
ImmutableBytesWritable ptr = context.getTempPtr();
- tableRef.getTable().getIndexMaintainers(ptr, context.getConnection());
+ PTable table = tableRef.getTable();
+ table.getIndexMaintainers(ptr, context.getConnection());
+ byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
ServerCache cache = null;
try {
if (ptr.getLength() > 0) {
IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+ cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
byte[] uuidValue = cache.getId();
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 5fd1602..e72b634 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -89,6 +89,7 @@ import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -607,12 +608,15 @@ public class UpsertCompiler {
@Override
public MutationState execute() throws SQLException {
ImmutableBytesWritable ptr = context.getTempPtr();
- tableRef.getTable().getIndexMaintainers(ptr, context.getConnection());
+ PTable table = tableRef.getTable();
+ table.getIndexMaintainers(ptr, context.getConnection());
+ byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
+
ServerCache cache = null;
try {
if (ptr.getLength() > 0) {
IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+ cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
byte[] uuidValue = cache.getId();
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 25ac408..c4eb4f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
+import co.cask.tephra.Transaction;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -85,6 +87,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
public static final String REVERSE_SCAN = "_ReverseScan";
public static final String ANALYZE_TABLE = "_ANALYZETABLE";
+ public static final String TX_STATE = "_TxState";
/**
* Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
* are used to augment log lines emitted by Phoenix. See https://issues.apache.org/jira/browse/PHOENIX-1198.
@@ -222,7 +225,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
final byte[][] viewConstants, final TupleProjector projector,
final ImmutableBytesWritable ptr) {
return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector,
- dataRegion, indexMaintainer, viewConstants, null, null, projector, ptr);
+ dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr);
}
/**
@@ -230,13 +233,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
* re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
* for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
* the same from a custom filter.
- * @param arrayFuncRefs
* @param arrayKVRefs
+ * @param arrayFuncRefs
* @param offset starting position in the rowkey.
* @param scan
* @param tupleProjector
* @param dataRegion
* @param indexMaintainer
+ * @param tx TODO
* @param viewConstants
*/
protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -244,9 +248,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
final Expression[] arrayFuncRefs, final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
final HRegion dataRegion, final IndexMaintainer indexMaintainer,
- final byte[][] viewConstants, final KeyValueSchema kvSchema,
- final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
- final ImmutableBytesWritable ptr) {
+ Transaction tx, final byte[][] viewConstants,
+ final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet,
+ final TupleProjector projector, final ImmutableBytesWritable ptr) {
return new RegionScanner() {
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index ddde407..04275b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -26,7 +26,7 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Set;
-import com.google.common.collect.Sets;
+import co.cask.tephra.Transaction;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -58,8 +58,10 @@ import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
@@ -190,6 +192,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
HRegion dataRegion = null;
IndexMaintainer indexMaintainer = null;
byte[][] viewConstants = null;
+ Transaction tx = null;
ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
if (dataColumns != null) {
tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
@@ -198,14 +201,16 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
indexMaintainer = indexMaintainers.get(0);
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
+ byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+ tx = TransactionUtil.decodeTxnState(txState);
}
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
innerScanner =
getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan,
- dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants,
- kvSchema, kvSchemaBitSet, j == null ? p : null, ptr);
+ dataColumns, tupleProjector, dataRegion, indexMaintainer, tx,
+ viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr);
final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
if (j != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 2f31b08..9f3bdb4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -26,18 +26,17 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-
import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheResponse;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.util.ByteUtil;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
@@ -66,6 +65,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C
ImmutableBytesWritable cachePtr =
org.apache.phoenix.protobuf.ProtobufUtil
.toImmutableBytesWritable(request.getCachePtr());
+ byte[] txState = request.hasTxState() ? request.getTxState().toByteArray() : ByteUtil.EMPTY_BYTE_ARRAY;
try {
@SuppressWarnings("unchecked")
@@ -73,7 +73,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C
(Class<ServerCacheFactory>) Class.forName(request.getCacheFactory().getClassName());
ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance();
tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()),
- cachePtr, cacheFactory);
+ cachePtr, txState, cacheFactory);
} catch (Throwable e) {
ProtobufUtil.setControllerException(controller, new IOException(e));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
index 4fdfe99..b201c8e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
@@ -36,18 +36,19 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
*/
public interface ServerCachingProtocol {
public static interface ServerCacheFactory extends Writable {
- public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException;
+ public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException;
}
/**
* Add the cache to the region server cache.
* @param tenantId the tenantId or null if not applicable
* @param cacheId unique identifier of the cache
* @param cachePtr pointer to the byte array of the cache
+ * @param txState TODO
* @param cacheFactory factory that converts from byte array to object representation on the server side
* @return true on success and otherwise throws
* @throws SQLException
*/
- public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+ public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException;
/**
* Remove the cache from the region server cache. Called upon completion of
* the operation when cache is no longer needed.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
index 69db21b..5ee1dfb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
@@ -699,6 +699,16 @@ public final class ServerCachingProtos {
* <code>required .ServerCacheFactory cacheFactory = 4;</code>
*/
org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactoryOrBuilder getCacheFactoryOrBuilder();
+
+ // optional bytes txState = 5;
+ /**
+ * <code>optional bytes txState = 5;</code>
+ */
+ boolean hasTxState();
+ /**
+ * <code>optional bytes txState = 5;</code>
+ */
+ com.google.protobuf.ByteString getTxState();
}
/**
* Protobuf type {@code AddServerCacheRequest}
@@ -787,6 +797,11 @@ public final class ServerCachingProtos {
bitField0_ |= 0x00000008;
break;
}
+ case 42: {
+ bitField0_ |= 0x00000010;
+ txState_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -903,11 +918,28 @@ public final class ServerCachingProtos {
return cacheFactory_;
}
+ // optional bytes txState = 5;
+ public static final int TXSTATE_FIELD_NUMBER = 5;
+ private com.google.protobuf.ByteString txState_;
+ /**
+ * <code>optional bytes txState = 5;</code>
+ */
+ public boolean hasTxState() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>optional bytes txState = 5;</code>
+ */
+ public com.google.protobuf.ByteString getTxState() {
+ return txState_;
+ }
+
private void initFields() {
tenantId_ = com.google.protobuf.ByteString.EMPTY;
cacheId_ = com.google.protobuf.ByteString.EMPTY;
cachePtr_ = org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ImmutableBytesWritable.getDefaultInstance();
cacheFactory_ = org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactory.getDefaultInstance();
+ txState_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -953,6 +985,9 @@ public final class ServerCachingProtos {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeMessage(4, cacheFactory_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBytes(5, txState_);
+ }
getUnknownFields().writeTo(output);
}
@@ -978,6 +1013,10 @@ public final class ServerCachingProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(4, cacheFactory_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(5, txState_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -1021,6 +1060,11 @@ public final class ServerCachingProtos {
result = result && getCacheFactory()
.equals(other.getCacheFactory());
}
+ result = result && (hasTxState() == other.hasTxState());
+ if (hasTxState()) {
+ result = result && getTxState()
+ .equals(other.getTxState());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -1050,6 +1094,10 @@ public final class ServerCachingProtos {
hash = (37 * hash) + CACHEFACTORY_FIELD_NUMBER;
hash = (53 * hash) + getCacheFactory().hashCode();
}
+ if (hasTxState()) {
+ hash = (37 * hash) + TXSTATE_FIELD_NUMBER;
+ hash = (53 * hash) + getTxState().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -1177,6 +1225,8 @@ public final class ServerCachingProtos {
cacheFactoryBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000008);
+ txState_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@@ -1229,6 +1279,10 @@ public final class ServerCachingProtos {
} else {
result.cacheFactory_ = cacheFactoryBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.txState_ = txState_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1257,6 +1311,9 @@ public final class ServerCachingProtos {
if (other.hasCacheFactory()) {
mergeCacheFactory(other.getCacheFactory());
}
+ if (other.hasTxState()) {
+ setTxState(other.getTxState());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1610,6 +1667,42 @@ public final class ServerCachingProtos {
return cacheFactoryBuilder_;
}
+ // optional bytes txState = 5;
+ private com.google.protobuf.ByteString txState_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>optional bytes txState = 5;</code>
+ */
+ public boolean hasTxState() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>optional bytes txState = 5;</code>
+ */
+ public com.google.protobuf.ByteString getTxState() {
+ return txState_;
+ }
+ /**
+ * <code>optional bytes txState = 5;</code>
+ */
+ public Builder setTxState(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000010;
+ txState_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bytes txState = 5;</code>
+ */
+ public Builder clearTxState() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ txState_ = getDefaultInstance().getTxState();
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:AddServerCacheRequest)
}
@@ -3383,20 +3476,21 @@ public final class ServerCachingProtos {
"\n\032ServerCachingService.proto\032\030ServerCach" +
"eFactory.proto\"K\n\026ImmutableBytesWritable" +
"\022\021\n\tbyteArray\030\001 \002(\014\022\016\n\006offset\030\002 \002(\005\022\016\n\006l" +
- "ength\030\003 \002(\005\"\220\001\n\025AddServerCacheRequest\022\020\n" +
+ "ength\030\003 \002(\005\"\241\001\n\025AddServerCacheRequest\022\020\n" +
"\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\022)\n\010cach" +
"ePtr\030\003 \002(\0132\027.ImmutableBytesWritable\022)\n\014c" +
- "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\"(" +
- "\n\026AddServerCacheResponse\022\016\n\006return\030\001 \002(\010" +
- "\"=\n\030RemoveServerCacheRequest\022\020\n\010tenantId" +
- "\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerC",
- "acheResponse\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerC" +
- "achingService\022A\n\016addServerCache\022\026.AddSer" +
- "verCacheRequest\032\027.AddServerCacheResponse" +
- "\022J\n\021removeServerCache\022\031.RemoveServerCach" +
- "eRequest\032\032.RemoveServerCacheResponseBG\n(" +
- "org.apache.phoenix.coprocessor.generated" +
- "B\023ServerCachingProtosH\001\210\001\001\240\001\001"
+ "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\022\017" +
+ "\n\007txState\030\005 \001(\014\"(\n\026AddServerCacheRespons" +
+ "e\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveServerCacheRe" +
+ "quest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014",
+ "\"+\n\031RemoveServerCacheResponse\022\016\n\006return\030" +
+ "\001 \002(\0102\245\001\n\024ServerCachingService\022A\n\016addSer" +
+ "verCache\022\026.AddServerCacheRequest\032\027.AddSe" +
+ "rverCacheResponse\022J\n\021removeServerCache\022\031" +
+ ".RemoveServerCacheRequest\032\032.RemoveServer" +
+ "CacheResponseBG\n(org.apache.phoenix.copr" +
+ "ocessor.generatedB\023ServerCachingProtosH\001" +
+ "\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3414,7 +3508,7 @@ public final class ServerCachingProtos {
internal_static_AddServerCacheRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_AddServerCacheRequest_descriptor,
- new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", });
+ new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", });
internal_static_AddServerCacheResponse_descriptor =
getDescriptor().getMessageTypes().get(2);
internal_static_AddServerCacheResponse_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 94233c8..d96e339 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -66,6 +66,7 @@ import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.TransactionUtil;
import org.cloudera.htrace.TraceScope;
import com.google.common.collect.Lists;
@@ -205,13 +206,15 @@ public abstract class BaseQueryPlan implements QueryPlan {
KeyValueSchema schema = ProjectedColumnExpression.buildSchema(dataColumns);
// Set key value schema of the data columns.
serializeSchemaIntoScan(scan, schema);
- String parentSchema = context.getCurrentTable().getTable().getParentSchemaName().getString();
- String parentTable = context.getCurrentTable().getTable().getParentTableName().getString();
+ PTable parentTable = context.getCurrentTable().getTable();
+ String parentSchemaName = parentTable.getParentSchemaName().getString();
+ String parentTableName = parentTable.getParentTableName().getString();
final ParseNodeFactory FACTORY = new ParseNodeFactory();
+ // TODO: is it necessary to re-resolve the table?
TableRef dataTableRef =
FromCompiler.getResolver(
- FACTORY.namedTable(null, TableName.create(parentSchema, parentTable)),
- context.getConnection()).resolveTable(parentSchema, parentTable);
+ FACTORY.namedTable(null, TableName.create(parentSchemaName, parentTableName)),
+ context.getConnection()).resolveTable(parentSchemaName, parentTableName);
PTable dataTable = dataTableRef.getTable();
// Set index maintainer of the local index.
serializeIndexMaintainerIntoScan(scan, dataTable);
@@ -248,7 +251,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
return (scope.getSpan() != null) ? new TracingIterator(scope, iterator) : iterator;
}
- private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable) {
+ private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable) throws SQLException {
PName name = context.getCurrentTable().getTable().getName();
List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
for (PTable index : dataTable.getIndexes()) {
@@ -260,6 +263,10 @@ public abstract class BaseQueryPlan implements QueryPlan {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
IndexMaintainer.serialize(dataTable, ptr, indexes, context.getConnection());
scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ if (dataTable.isTransactional()) {
+ PhoenixConnection conn = context.getConnection();
+ scan.setAttribute(BaseScannerRegionObserver.TX_STATE, TransactionUtil.encodeTxnState(conn.getTransactionContext().getCurrentTransaction()));
+ }
}
private void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index edaab9e..8915534 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -394,9 +394,10 @@ public class MutationState implements SQLCloseable {
if (hasIndexMaintainers && isDataTable) {
byte[] attribValue = null;
byte[] uuidValue;
- if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) {
+ byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
+ if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength() + txState.length)) {
IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(mutations, tempPtr);
+ cache = client.addIndexMetadataCache(mutations, tempPtr, txState);
child.addTimelineAnnotation("Updated index metadata cache");
uuidValue = cache.getId();
// If we haven't retried yet, retry for this case only, as it's possible that
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index 1c9782e..b91a52a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -68,6 +68,15 @@ public interface IndexBuilder extends Stoppable {
* @return a Map of the mutations to make -> target index table name
* @throws IOException on failure
*/
+ /* TODO:
+ Create BaseIndexBuilder with everything except getIndexUpdate().
+ Derive two concrete classes: NonTxIndexBuilder and TxIndexBuilder.
+ NonTxIndexBuilder will be current impl of this method.
+ TxIndexBuilder will use a scan with skipScan over TxAwareHBase to find the latest values.
+ Conditionally don't do WALEdit stuff for txnal table (ensure Phoenix/HBase tolerates index WAl edit info not being there)
+ Noop Failure mode
+ */
+
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index 575779a..32e4d84 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.collect.Lists;
import org.apache.phoenix.hbase.index.covered.KeyValueStore;
import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter;
import org.apache.phoenix.hbase.index.covered.filter.ColumnTrackingNextLargestTimestampFilter;
@@ -41,6 +39,8 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import com.google.common.collect.Lists;
+
/**
*
*/
@@ -57,6 +57,7 @@ public class ScannerBuilder {
public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts) {
+ // TODO: This needs to use some form of the filter that Tephra has when transactional
Filter columnFilters = getColumnFilters(indexedColumns);
FilterList filters = new FilterList(Lists.newArrayList(columnFilters));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index 70ddc86..c1135bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -67,30 +67,32 @@ public class IndexMetaDataCacheClient {
/**
* Send the index metadata cahce to all region servers for regions that will handle the mutations.
+ * @param txState TODO
* @return client-side {@link ServerCache} representing the added index metadata cache
* @throws SQLException
* @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
* size
*/
- public ServerCache addIndexMetadataCache(List<Mutation> mutations, ImmutableBytesWritable ptr) throws SQLException {
+ public ServerCache addIndexMetadataCache(List<Mutation> mutations, ImmutableBytesWritable ptr, byte[] txState) throws SQLException {
/**
* Serialize and compress hashCacheTable
*/
- return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
+ return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
}
/**
* Send the index metadata cahce to all region servers for regions that will handle the mutations.
+ * @param txState TODO
* @return client-side {@link ServerCache} representing the added index metadata cache
* @throws SQLException
* @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
* size
*/
- public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr) throws SQLException {
+ public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr, byte[] txState) throws SQLException {
/**
* Serialize and compress hashCacheTable
*/
- return serverCache.addServerCache(ranges, ptr, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
+ return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 488db44..8b1ee18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -24,11 +24,14 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
+import co.cask.tephra.Transaction;
+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.cache.IndexMetaDataCache;
import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.TransactionUtil;
public class IndexMetaDataCacheFactory implements ServerCacheFactory {
public IndexMetaDataCacheFactory() {
@@ -43,10 +46,16 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
}
@Override
- public Closeable newCache (ImmutableBytesWritable cachePtr, final MemoryChunk chunk) throws SQLException {
+ public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk) throws SQLException {
// just use the standard keyvalue builder - this doesn't really need to be fast
- final List<IndexMaintainer> maintainers =
+ final List<IndexMaintainer> maintainers =
IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE);
+ final Transaction txn;
+ try {
+ txn = TransactionUtil.decodeTxnState(txState);
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
return new IndexMetaDataCache() {
@Override
@@ -58,6 +67,11 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
public List<IndexMaintainer> getIndexMaintainers() {
return maintainers;
}
+
+ @Override
+ public Transaction getTransaction() {
+ return txn;
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index b89c807..eb117bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -58,7 +58,7 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
- List<IndexMaintainer> indexMaintainers = getCodec().getIndexMaintainers(m.getAttributesMap());
+ List<IndexMaintainer> indexMaintainers = getCodec().getIndexMetaData(m.getAttributesMap()).getIndexMaintainers();
for(IndexMaintainer indexMaintainer: indexMaintainers) {
if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 99e26d1..8b507b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -23,10 +23,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import co.cask.tephra.Transaction;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
@@ -34,6 +34,7 @@ import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.IndexMetaDataCache;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.hbase.index.ValueGetter;
@@ -50,6 +51,7 @@ import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
import com.google.common.collect.Lists;
@@ -78,18 +80,47 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
}
- List<IndexMaintainer> getIndexMaintainers(Map<String, byte[]> attributes) throws IOException{
+ boolean hasIndexMaintainers(Map<String, byte[]> attributes) {
if (attributes == null) {
- return Collections.emptyList();
+ return false;
}
byte[] uuid = attributes.get(INDEX_UUID);
if (uuid == null) {
- return Collections.emptyList();
+ return false;
+ }
+ return true;
+ }
+
+ IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException{
+ if (attributes == null) {
+ return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
+ }
+ byte[] uuid = attributes.get(INDEX_UUID);
+ if (uuid == null) {
+ return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
}
byte[] md = attributes.get(INDEX_MD);
- List<IndexMaintainer> indexMaintainers;
+ byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
if (md != null) {
- indexMaintainers = IndexMaintainer.deserialize(md);
+ final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md);
+ final Transaction txn = TransactionUtil.decodeTxnState(txState);
+ return new IndexMetaDataCache() {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public List<IndexMaintainer> getIndexMaintainers() {
+ return indexMaintainers;
+ }
+
+ @Override
+ public Transaction getTransaction() {
+ return txn;
+ }
+
+ };
} else {
byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
ImmutableBytesWritable tenantId =
@@ -103,10 +134,9 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
.setMessage(msg).build().buildException();
ServerUtil.throwIOException("Index update failed", e); // will not return
}
- indexMaintainers = indexCache.getIndexMaintainers();
+ return indexCache;
}
- return indexMaintainers;
}
@Override
@@ -127,7 +157,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
* @throws IOException
*/
private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException {
- List<IndexMaintainer> indexMaintainers = getIndexMaintainers(state.getUpdateAttributes());
+ IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes());
+ List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers();
if (indexMaintainers.isEmpty()) {
return Collections.emptyList();
}
@@ -187,7 +218,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
@Override
public boolean isEnabled(Mutation m) throws IOException {
- return !getIndexMaintainers(m.getAttributesMap()).isEmpty();
+ return !hasIndexMaintainers(m.getAttributesMap());
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index dffd7c4..a646d96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -439,12 +439,16 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
public void startTransaction() throws SQLException {
if (txContext == null) {
+ boolean success = false;
try {
TransactionSystemClient txServiceClient = this.getQueryServices().getTransactionSystemClient();
this.txContext = new TransactionContext(txServiceClient);
txContext.start();
+ success = true;
} catch (TransactionFailureException e) {
throw new SQLException(e); // TODO: error code
+ } finally {
+ if (!success) endTransaction();
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index f13b28e..e8b3389 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -39,6 +39,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.TrustedByteArrayOutputStream;
import org.apache.phoenix.util.TupleUtil;
@@ -80,7 +81,7 @@ public class HashCacheClient {
*/
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
- return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef);
+ return serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef);
}
private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 8cae51a..3072736 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -56,7 +56,7 @@ public class HashCacheFactory implements ServerCacheFactory {
}
@Override
- public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException {
+ public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException {
try {
// This reads the uncompressed length from the front of the compressed input
int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 34bba47..6058711 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -45,12 +45,8 @@ public class TransactionUtil {
}
}
- public static Transaction decodeTxnState(byte[] txnBytes) throws SQLException {
- try {
- return codec.decode(txnBytes);
- } catch (IOException e) {
- throw new SQLException(e);
- }
+ public static Transaction decodeTxnState(byte[] txnBytes) throws IOException {
+ return txnBytes == null ? null : codec.decode(txnBytes);
}
public static SQLException getSQLException(TransactionFailureException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-protocol/src/main/ServerCachingService.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto
index 63aeef9..dcbaabd 100644
--- a/phoenix-protocol/src/main/ServerCachingService.proto
+++ b/phoenix-protocol/src/main/ServerCachingService.proto
@@ -37,6 +37,7 @@ message AddServerCacheRequest {
required bytes cacheId = 2;
required ImmutableBytesWritable cachePtr = 3;
required ServerCacheFactory cacheFactory = 4;
+ optional bytes txState = 5;
}
message AddServerCacheResponse {