You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by je...@apache.org on 2014/02/08 08:42:58 UTC
[09/11] Port Phoenix to Hbase0.98
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index bc357b1..cc0dc05 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -52,8 +52,6 @@ import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
import static org.apache.phoenix.util.SchemaUtil.getVarChars;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -63,14 +61,19 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
@@ -80,11 +83,25 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.hadoop.hbase.index.util.IndexManagementUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
@@ -113,6 +130,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
/**
*
@@ -129,7 +149,7 @@ import com.google.common.collect.Lists;
*
* @since 0.1
*/
-public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements MetaDataProtocol {
+public class MetaDataEndpointImpl extends MetaDataProtocol implements CoprocessorService, Coprocessor {
private static final Logger logger = LoggerFactory.getLogger(MetaDataEndpointImpl.class);
// KeyValues for Table
@@ -203,7 +223,8 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
private static final int COLUMN_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV);
private static final int NULLABLE_INDEX = COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV);
private static final int SQL_DATA_TYPE_INDEX = COLUMN_KV_COLUMNS.indexOf(DATA_TYPE_KV);
- private static final int ORDINAL_POSITION_INDEX = COLUMN_KV_COLUMNS.indexOf(ORDINAL_POSITION_KV);
+ private static final int ORDINAL_POSITION_INDEX = COLUMN_KV_COLUMNS
+ .indexOf(ORDINAL_POSITION_KV);
private static final int COLUMN_MODIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_MODIFIER_KV);
private static final int ARRAY_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(ARRAY_SIZE_KV);
@@ -219,8 +240,9 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
System.arraycopy(keyBuffer, keyOffset, pnameBuf, 0, length);
return PNameFactory.newName(pnameBuf);
}
-
- private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp) throws IOException {
+
+ private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp)
+ throws IOException {
Scan scan = new Scan();
scan.setTimeRange(startTimeStamp, stopTimeStamp);
scan.setStartRow(key);
@@ -230,18 +252,159 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
return scan;
}
+ private RegionCoprocessorEnvironment env;
+
+ private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, HRegion region) {
+ byte[] startKey = region.getStartKey();
+ byte[] endKey = region.getEndKey();
+ if (Bytes.compareTo(startKey, key) <= 0
+ && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key,
+ endKey) < 0)) {
+ return null; // normal case;
+ }
+ return new MetaDataMutationResult(MutationCode.TABLE_NOT_IN_REGION,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
+ }
+
+ /**
+ * Stores a reference to the coprocessor environment provided by the
+ * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
+ * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
+ * on a table region, so always expects this to be an instance of
+ * {@link RegionCoprocessorEnvironment}.
+ * @param env the environment provided by the coprocessor host
+ * @throws IOException if the provided environment is not an instance of
+ * {@code RegionCoprocessorEnvironment}
+ */
@Override
- public RegionCoprocessorEnvironment getEnvironment() {
- return (RegionCoprocessorEnvironment)super.getEnvironment();
+ public void start(CoprocessorEnvironment env) throws IOException {
+ if (env instanceof RegionCoprocessorEnvironment) {
+ this.env = (RegionCoprocessorEnvironment) env;
+ } else {
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
}
-
- private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException, SQLException {
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // nothing to do
+ }
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void getTable(RpcController controller, GetTableRequest request,
+ RpcCallback<MetaDataResponse> done) {
+ MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+ byte[] tenantId = request.getTenantId().toByteArray();
+ byte[] schemaName = request.getSchemaName().toByteArray();
+ byte[] tableName = request.getTableName().toByteArray();
+ byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+ long tableTimeStamp = request.getTableTimestamp();
+
+ try {
+ // TODO: check that key is within region.getStartKey() and region.getEndKey()
+ // and return special code to force client to lookup region from meta.
+ HRegion region = env.getRegion();
+ MetaDataMutationResult result = checkTableKeyInRegion(key, region);
+ if (result != null) {
+ done.run(MetaDataMutationResult.toProto(result));
+ return;
+ }
+
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ PTable table = doGetTable(key, request.getClientTimestamp());
+ if (table == null) {
+ builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
+ builder.setMutationTime(currentTime);
+ done.run(builder.build());
+ return;
+ }
+ builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
+ builder.setMutationTime(currentTime);
+ if (table.getTimeStamp() != tableTimeStamp) {
+ builder.setTable(PTableImpl.toProto(table));
+ }
+ done.run(builder.build());
+ return;
+ } catch (Throwable t) {
+ logger.error("getTable failed", t);
+ ProtobufUtil.setControllerException(controller,
+ ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
+ }
+ }
+
+ private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
+ ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+ Map<ImmutableBytesPtr, PTable> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
+ PTable table = metaDataCache.get(cacheKey);
+ // We only cache the latest, so we'll end up building the table with every call if the
+ // client connection has specified an SCN.
+ // TODO: If we indicate to the client that we're returning an older version, but there's a
+ // newer version available, the client
+ // can safely not call this, since we only allow modifications to the latest.
+ if (table != null && table.getTimeStamp() < clientTimeStamp) {
+ // Table on client is up-to-date with table on server, so just return
+ if (isTableDeleted(table)) {
+ return null;
+ }
+ return table;
+ }
+ // Ask Lars about the expense of this call - if we don't take the lock, we still won't get
+ // partial results
+ // get the co-processor environment
+ // TODO: check that key is within region.getStartKey() and region.getEndKey()
+ // and return special code to force client to lookup region from meta.
+ HRegion region = env.getRegion();
+ /*
+ * Lock directly on key, though it may be an index table. This will just prevent a table
+ * from getting rebuilt too often.
+ */
+ RowLock rowLock = region.getRowLock(key);
+ if (rowLock == null) {
+ throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
+ }
+ try {
+ // Try cache again in case we were waiting on a lock
+ table = metaDataCache.get(cacheKey);
+ // We only cache the latest, so we'll end up building the table with every call if the
+ // client connection has specified an SCN.
+ // TODO: If we indicate to the client that we're returning an older version, but there's
+ // a newer version available, the client
+ // can safely not call this, since we only allow modifications to the latest.
+ if (table != null && table.getTimeStamp() < clientTimeStamp) {
+ // Table on client is up-to-date with table on server, so just return
+ if (isTableDeleted(table)) {
+ return null;
+ }
+ return table;
+ }
+ // Query for the latest table first, since it's not cached
+ table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
+ if (table != null && table.getTimeStamp() < clientTimeStamp) {
+ return table;
+ }
+ // Otherwise, query for an older version of the table - it won't be cached
+ return buildTable(key, cacheKey, region, clientTimeStamp);
+ } finally {
+ rowLock.release();
+ }
+ }
+
+ private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
+ long clientTimeStamp) throws IOException, SQLException {
Scan scan = newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
RegionScanner scanner = region.getScanner(scan);
- Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+ Map<ImmutableBytesPtr, PTable> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
try {
PTable oldTable = metaDataCache.get(cacheKey);
- long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
+ long tableTimeStamp =
+ oldTable == null ? MIN_TABLE_TIMESTAMP - 1 : oldTable.getTimeStamp();
PTable newTable;
newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
if (newTable == null) {
@@ -249,14 +412,24 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
}
if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()) {
if (logger.isDebugEnabled()) {
- logger.debug("Caching table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), cacheKey.getLength()) + " at seqNum " + newTable.getSequenceNumber() + " with newer timestamp " + newTable.getTimeStamp() + " versus " + tableTimeStamp);
+ logger.debug("Caching table "
+ + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
+ cacheKey.getLength()) + " at seqNum "
+ + newTable.getSequenceNumber() + " with newer timestamp "
+ + newTable.getTimeStamp() + " versus " + tableTimeStamp);
}
oldTable = metaDataCache.put(cacheKey, newTable);
if (logger.isDebugEnabled()) {
if (oldTable == null) {
- logger.debug("No previously cached table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), cacheKey.getLength()));
+ logger.debug("No previously cached table "
+ + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
+ cacheKey.getLength()));
} else {
- logger.debug("Previously cached table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), cacheKey.getLength()) + " was at seqNum " + oldTable.getSequenceNumber() + " with timestamp " + oldTable.getTimeStamp());
+ logger.debug("Previously cached table "
+ + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
+ cacheKey.getLength()) + " was at seqNum "
+ + oldTable.getSequenceNumber() + " with timestamp "
+ + oldTable.getTimeStamp());
}
}
}
@@ -276,14 +449,17 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
indexes.add(indexTable);
}
- private void addColumnToTable(List<KeyValue> results, PName colName, PName famName, KeyValue[] colKeyValues, List<PColumn> columns) {
+ private void addColumnToTable(List<Cell> results, PName colName, PName famName,
+ Cell[] colKeyValues, List<PColumn> columns) {
int i = 0;
int j = 0;
while (i < results.size() && j < COLUMN_KV_COLUMNS.size()) {
- KeyValue kv = results.get(i);
- KeyValue searchKv = COLUMN_KV_COLUMNS.get(j);
- int cmp = Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(),
- searchKv.getBuffer(), searchKv.getQualifierOffset(), searchKv.getQualifierLength());
+ Cell kv = results.get(i);
+ Cell searchKv = COLUMN_KV_COLUMNS.get(j);
+ int cmp =
+ Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+ kv.getQualifierLength(), searchKv.getQualifierArray(),
+ searchKv.getQualifierOffset(), searchKv.getQualifierLength());
if (cmp == 0) {
colKeyValues[j++] = kv;
i++;
@@ -293,53 +469,77 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
i++; // shouldn't happen - means unexpected KV in system table column row
}
}
- // COLUMN_SIZE and DECIMAL_DIGIT are optional. NULLABLE, DATA_TYPE and ORDINAL_POSITION_KV are required.
+ // COLUMN_SIZE and DECIMAL_DIGIT are optional. NULLABLE, DATA_TYPE and ORDINAL_POSITION_KV
+ // are required.
if (colKeyValues[SQL_DATA_TYPE_INDEX] == null || colKeyValues[NULLABLE_INDEX] == null
|| colKeyValues[ORDINAL_POSITION_INDEX] == null) {
- throw new IllegalStateException("Didn't find all required key values in '" + colName.getString() + "' column metadata row");
+ throw new IllegalStateException("Didn't find all required key values in '"
+ + colName.getString() + "' column metadata row");
}
- KeyValue columnSizeKv = colKeyValues[COLUMN_SIZE_INDEX];
- Integer maxLength = columnSizeKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(columnSizeKv.getBuffer(), columnSizeKv.getValueOffset(), null);
- KeyValue decimalDigitKv = colKeyValues[DECIMAL_DIGITS_INDEX];
- Integer scale = decimalDigitKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(decimalDigitKv.getBuffer(), decimalDigitKv.getValueOffset(), null);
- KeyValue ordinalPositionKv = colKeyValues[ORDINAL_POSITION_INDEX];
- int position = PDataType.INTEGER.getCodec().decodeInt(ordinalPositionKv.getBuffer(), ordinalPositionKv.getValueOffset(), null);
- KeyValue nullableKv = colKeyValues[NULLABLE_INDEX];
- boolean isNullable = PDataType.INTEGER.getCodec().decodeInt(nullableKv.getBuffer(), nullableKv.getValueOffset(), null) != ResultSetMetaData.columnNoNulls;
- KeyValue sqlDataTypeKv = colKeyValues[SQL_DATA_TYPE_INDEX];
- PDataType dataType = PDataType.fromTypeId(PDataType.INTEGER.getCodec().decodeInt(sqlDataTypeKv.getBuffer(), sqlDataTypeKv.getValueOffset(), null));
- if (maxLength == null && dataType == PDataType.BINARY) dataType = PDataType.VARBINARY; // For backward compatibility.
- KeyValue columnModifierKv = colKeyValues[COLUMN_MODIFIER_INDEX];
- ColumnModifier sortOrder = columnModifierKv == null ? null : ColumnModifier.fromSystemValue(PDataType.INTEGER.getCodec().decodeInt(columnModifierKv.getBuffer(), columnModifierKv.getValueOffset(), null));
- KeyValue arraySizeKv = colKeyValues[ARRAY_SIZE_INDEX];
- Integer arraySize = arraySizeKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(arraySizeKv.getBuffer(), arraySizeKv.getValueOffset(), null);
- PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize);
+ Cell columnSizeKv = colKeyValues[COLUMN_SIZE_INDEX];
+ Integer maxLength =
+ columnSizeKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(
+ columnSizeKv.getValueArray(), columnSizeKv.getValueOffset(), null);
+ Cell decimalDigitKv = colKeyValues[DECIMAL_DIGITS_INDEX];
+ Integer scale =
+ decimalDigitKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(
+ decimalDigitKv.getValueArray(), decimalDigitKv.getValueOffset(), null);
+ Cell ordinalPositionKv = colKeyValues[ORDINAL_POSITION_INDEX];
+ int position =
+ PDataType.INTEGER.getCodec().decodeInt(ordinalPositionKv.getValueArray(),
+ ordinalPositionKv.getValueOffset(), null);
+ Cell nullableKv = colKeyValues[NULLABLE_INDEX];
+ boolean isNullable =
+ PDataType.INTEGER.getCodec().decodeInt(nullableKv.getValueArray(),
+ nullableKv.getValueOffset(), null) != ResultSetMetaData.columnNoNulls;
+ Cell sqlDataTypeKv = colKeyValues[SQL_DATA_TYPE_INDEX];
+ PDataType dataType =
+ PDataType.fromTypeId(PDataType.INTEGER.getCodec().decodeInt(
+ sqlDataTypeKv.getValueArray(), sqlDataTypeKv.getValueOffset(), null));
+ if (maxLength == null && dataType == PDataType.BINARY) dataType = PDataType.VARBINARY; // For
+ // backward
+ // compatibility.
+ Cell columnModifierKv = colKeyValues[COLUMN_MODIFIER_INDEX];
+ ColumnModifier sortOrder =
+ columnModifierKv == null ? null : ColumnModifier.fromSystemValue(PDataType.INTEGER
+ .getCodec().decodeInt(columnModifierKv.getValueArray(),
+ columnModifierKv.getValueOffset(), null));
+
+ Cell arraySizeKv = colKeyValues[ARRAY_SIZE_INDEX];
+ Integer arraySize = arraySizeKv == null ? null :
+ PDataType.INTEGER.getCodec().decodeInt(arraySizeKv.getValueArray(), arraySizeKv.getValueOffset(), null);
+
+ PColumn column =
+ new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable,
+ position - 1, sortOrder, arraySize);
columns.add(column);
}
-
- private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp) throws IOException, SQLException {
- List<KeyValue> results = Lists.newArrayList();
+
+ private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp)
+ throws IOException, SQLException {
+ List<Cell> results = Lists.newArrayList();
scanner.next(results);
if (results.isEmpty()) {
return null;
}
- KeyValue[] tableKeyValues = new KeyValue[TABLE_KV_COLUMNS.size()];
- KeyValue[] colKeyValues = new KeyValue[COLUMN_KV_COLUMNS.size()];
-
+ Cell[] tableKeyValues = new Cell[TABLE_KV_COLUMNS.size()];
+ Cell[] colKeyValues = new Cell[COLUMN_KV_COLUMNS.size()];
+
// Create PTable based on KeyValues from scan
- KeyValue keyValue = results.get(0);
- byte[] keyBuffer = keyValue.getBuffer();
+ Cell keyValue = results.get(0);
+ byte[] keyBuffer = keyValue.getRowArray();
int keyLength = keyValue.getRowLength();
int keyOffset = keyValue.getRowOffset();
PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
int tenantIdLength = tenantId.getBytes().length;
- PName schemaName = newPName(keyBuffer, keyOffset+tenantIdLength+1, keyLength);
+ PName schemaName = newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength);
int schemaNameLength = schemaName.getBytes().length;
- int tableNameLength = keyLength-schemaNameLength-1-tenantIdLength-1;
+ int tableNameLength = keyLength - schemaNameLength - 1 - tenantIdLength - 1;
byte[] tableNameBytes = new byte[tableNameLength];
- System.arraycopy(keyBuffer, keyOffset+schemaNameLength+1+tenantIdLength+1, tableNameBytes, 0, tableNameLength);
+ System.arraycopy(keyBuffer, keyOffset + schemaNameLength + 1 + tenantIdLength + 1,
+ tableNameBytes, 0, tableNameLength);
PName tableName = PNameFactory.newName(tableNameBytes);
-
+
int offset = tenantIdLength + schemaNameLength + tableNameLength + 3;
// This will prevent the client from continually looking for the current
// table when we know that there will never be one since we disallow updates
@@ -349,20 +549,23 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
// bump up the timeStamp to right before the client time stamp, since
// we know it can't possibly change.
long timeStamp = keyValue.getTimestamp();
-// long timeStamp = tableTimeStamp > keyValue.getTimestamp() &&
-// clientTimeStamp < tableTimeStamp
-// ? clientTimeStamp-1
-// : keyValue.getTimestamp();
-
+ // long timeStamp = tableTimeStamp > keyValue.getTimestamp() &&
+ // clientTimeStamp < tableTimeStamp
+ // ? clientTimeStamp-1
+ // : keyValue.getTimestamp();
+
int i = 0;
int j = 0;
while (i < results.size() && j < TABLE_KV_COLUMNS.size()) {
- KeyValue kv = results.get(i);
- KeyValue searchKv = TABLE_KV_COLUMNS.get(j);
- int cmp = Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(),
- searchKv.getBuffer(), searchKv.getQualifierOffset(), searchKv.getQualifierLength());
+ Cell kv = results.get(i);
+ Cell searchKv = TABLE_KV_COLUMNS.get(j);
+ int cmp =
+ Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+ kv.getQualifierLength(), searchKv.getQualifierArray(),
+ searchKv.getQualifierOffset(), searchKv.getQualifierLength());
if (cmp == 0) {
- timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table header row
+ timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table
+ // header row
tableKeyValues[j++] = kv;
i++;
} else if (cmp > 0) {
@@ -374,83 +577,105 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
// TABLE_TYPE, TABLE_SEQ_NUM and COLUMN_COUNT are required.
if (tableKeyValues[TABLE_TYPE_INDEX] == null || tableKeyValues[TABLE_SEQ_NUM_INDEX] == null
|| tableKeyValues[COLUMN_COUNT_INDEX] == null) {
- throw new IllegalStateException("Didn't find expected key values for table row in metadata row");
+ throw new IllegalStateException(
+ "Didn't find expected key values for table row in metadata row");
}
- KeyValue tableTypeKv = tableKeyValues[TABLE_TYPE_INDEX];
- PTableType tableType = PTableType.fromSerializedValue(tableTypeKv.getBuffer()[tableTypeKv.getValueOffset()]);
- KeyValue tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
- long tableSeqNum = PDataType.LONG.getCodec().decodeLong(tableSeqNumKv.getBuffer(), tableSeqNumKv.getValueOffset(), null);
- KeyValue columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
- int columnCount = PDataType.INTEGER.getCodec().decodeInt(columnCountKv.getBuffer(), columnCountKv.getValueOffset(), null);
- KeyValue pkNameKv = tableKeyValues[PK_NAME_INDEX];
- PName pkName = pkNameKv != null ? newPName(pkNameKv.getBuffer(), pkNameKv.getValueOffset(), pkNameKv.getValueLength()) : null;
- KeyValue saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
- Integer saltBucketNum = saltBucketNumKv != null ? (Integer)PDataType.INTEGER.getCodec().decodeInt(saltBucketNumKv.getBuffer(), saltBucketNumKv.getValueOffset(), null) : null;
- KeyValue dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
- PName dataTableName = dataTableNameKv != null ? newPName(dataTableNameKv.getBuffer(), dataTableNameKv.getValueOffset(), dataTableNameKv.getValueLength()) : null;
- KeyValue indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
- PIndexState indexState = indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv.getBuffer()[indexStateKv.getValueOffset()]);
- KeyValue immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
- boolean isImmutableRows = immutableRowsKv == null ? false : (Boolean)PDataType.BOOLEAN.toObject(immutableRowsKv.getBuffer(), immutableRowsKv.getValueOffset(), immutableRowsKv.getValueLength());
- KeyValue defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
- PName defaultFamilyName = defaultFamilyNameKv != null ? newPName(defaultFamilyNameKv.getBuffer(), defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : null;
- KeyValue viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
- String viewStatement = viewStatementKv != null ? (String)PDataType.VARCHAR.toObject(viewStatementKv.getBuffer(), viewStatementKv.getValueOffset(), viewStatementKv.getValueLength()) : null;
- KeyValue disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
- boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(disableWALKv.getBuffer(), disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
- KeyValue multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
- boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(multiTenantKv.getBuffer(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
- KeyValue viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
- ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getBuffer()[viewTypeKv.getValueOffset()]);
+ Cell tableTypeKv = tableKeyValues[TABLE_TYPE_INDEX];
+ PTableType tableType =
+ PTableType
+ .fromSerializedValue(tableTypeKv.getValueArray()[tableTypeKv.getValueOffset()]);
+ Cell tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
+ long tableSeqNum =
+ PDataType.LONG.getCodec().decodeLong(tableSeqNumKv.getValueArray(),
+ tableSeqNumKv.getValueOffset(), null);
+ Cell columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
+ int columnCount =
+ PDataType.INTEGER.getCodec().decodeInt(columnCountKv.getValueArray(),
+ columnCountKv.getValueOffset(), null);
+ Cell pkNameKv = tableKeyValues[PK_NAME_INDEX];
+ PName pkName =
+ pkNameKv != null ? newPName(pkNameKv.getValueArray(), pkNameKv.getValueOffset(),
+ pkNameKv.getValueLength()) : null;
+ Cell saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
+ Integer saltBucketNum =
+ saltBucketNumKv != null ? (Integer) PDataType.INTEGER.getCodec().decodeInt(
+ saltBucketNumKv.getValueArray(), saltBucketNumKv.getValueOffset(), null) : null;
+ Cell dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
+ PName dataTableName =
+ dataTableNameKv != null ? newPName(dataTableNameKv.getValueArray(),
+ dataTableNameKv.getValueOffset(), dataTableNameKv.getValueLength()) : null;
+ Cell indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
+ PIndexState indexState =
+ indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv
+ .getValueArray()[indexStateKv.getValueOffset()]);
+ Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
+ boolean isImmutableRows =
+ immutableRowsKv == null ? false : (Boolean) PDataType.BOOLEAN.toObject(
+ immutableRowsKv.getValueArray(), immutableRowsKv.getValueOffset(),
+ immutableRowsKv.getValueLength());
+ Cell defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
+ PName defaultFamilyName = defaultFamilyNameKv != null ? newPName(defaultFamilyNameKv.getValueArray(), defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : null;
+ Cell viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
+ String viewStatement = viewStatementKv != null ? (String)PDataType.VARCHAR.toObject(viewStatementKv.getValueArray(), viewStatementKv.getValueOffset(), viewStatementKv.getValueLength()) : null;
+ Cell disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
+ boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(disableWALKv.getValueArray(), disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
+ Cell multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
+ boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(multiTenantKv.getValueArray(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
+ Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
+ ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
List<PTable> indexes = new ArrayList<PTable>();
List<PName> physicalTables = new ArrayList<PName>();
while (true) {
- results.clear();
- scanner.next(results);
- if (results.isEmpty()) {
- break;
- }
- KeyValue colKv = results.get(LINK_TYPE_INDEX);
- int colKeyLength = colKv.getRowLength();
- PName colName = newPName(colKv.getBuffer(), colKv.getRowOffset() + offset, colKeyLength-offset);
- int colKeyOffset = offset + colName.getBytes().length + 1;
- PName famName = newPName(colKv.getBuffer(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
- if (colName.getString().isEmpty() && famName != null) {
- LinkType linkType = LinkType.fromSerializedValue(colKv.getBuffer()[colKv.getValueOffset()]);
- if (linkType == LinkType.INDEX_TABLE) {
- addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
- } else if (linkType == LinkType.PHYSICAL_TABLE) {
- physicalTables.add(famName);
- } else {
- logger.warn("Unknown link type: " + colKv.getBuffer()[colKv.getValueOffset()] + " for " + SchemaUtil.getTableName(schemaName.getString(), tableName.getString()));
- }
- } else {
- addColumnToTable(results, colName, famName, colKeyValues, columns);
- }
+ results.clear();
+ scanner.next(results);
+ if (results.isEmpty()) {
+ break;
+ }
+ Cell colKv = results.get(LINK_TYPE_INDEX);
+ int colKeyLength = colKv.getRowLength();
+ PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
+ int colKeyOffset = offset + colName.getBytes().length + 1;
+ PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
+ if (colName.getString().isEmpty() && famName != null) {
+ LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
+ if (linkType == LinkType.INDEX_TABLE) {
+ addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
+ } else if (linkType == LinkType.PHYSICAL_TABLE) {
+ physicalTables.add(famName);
+ } else {
+ logger.warn("Unknown link type: " + colKv.getValueArray()[colKv.getValueOffset()] + " for " + SchemaUtil.getTableName(schemaName.getString(), tableName.getString()));
+ }
+ } else {
+ addColumnToTable(results, colName, famName, colKeyValues, columns);
+ }
}
- return PTableImpl.makePTable(schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null,
- indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType);
+ return PTableImpl.makePTable(schemaName, tableName, tableType, indexState, timeStamp,
+ tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null,
+ indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL,
+ multiTenant, viewType);
}
- private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException {
+ private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
+ long clientTimeStamp) throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
return null;
}
-
+
Scan scan = newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
scan.setFilter(new FirstKeyOnlyFilter());
scan.setRaw(true);
RegionScanner scanner = region.getScanner(scan);
- List<KeyValue> results = Lists.<KeyValue>newArrayList();
+ List<Cell> results = Lists.<Cell> newArrayList();
scanner.next(results);
// HBase ignores the time range on a raw scan (HBASE-7362)
if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
- KeyValue kv = results.get(0);
- if (kv.isDelete()) {
- Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+ Cell kv = results.get(0);
+ if (kv.getTypeByte() == Type.Delete.getCode()) {
+ Map<ImmutableBytesPtr, PTable> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = newDeletedTableMarker(kv.getTimestamp());
metaDataCache.put(cacheKey, table);
return table;
@@ -467,16 +692,21 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
return table.getName() == null;
}
- private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp) throws IOException, SQLException {
+ private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
+ ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
+ throws IOException, SQLException {
HRegion region = env.getRegion();
- Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+ Map<ImmutableBytesPtr, PTable> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = metaDataCache.get(cacheKey);
// We always cache the latest version - fault in if not in cache
if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) {
return table;
}
- // if not found then check if newer table already exists and add delete marker for timestamp found
- if (table == null && (table=buildDeletedTable(key, cacheKey, region, clientTimeStamp)) != null) {
+ // if not found then check if newer table already exists and add delete marker for timestamp
+ // found
+ if (table == null
+ && (table = buildDeletedTable(key, cacheKey, region, clientTimeStamp)) != null) {
return table;
}
return null;
@@ -484,102 +714,136 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
@Override
- public MetaDataMutationResult createTable(List<Mutation> tableMetadata) throws IOException {
+ public void createTable(RpcController controller, CreateTableRequest request,
+ RpcCallback<MetaDataResponse> done) {
+ MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
byte[][] rowKeyMetaData = new byte[3][];
- MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,rowKeyMetaData);
- byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
- byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
- byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
-
+ byte[] schemaName = null;
+ byte[] tableName = null;
+
try {
+ List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
+ MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
+ byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+ schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+ tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
byte[] lockTableName = parentTableName == null ? tableName : parentTableName;
byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, lockTableName);
- byte[] key = parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
+ byte[] key =
+ parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes,
+ schemaName, tableName);
byte[] parentKey = parentTableName == null ? null : lockKey;
-
- RegionCoprocessorEnvironment env = getEnvironment();
+
HRegion region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(lockKey, region);
if (result != null) {
- return result;
+ done.run(MetaDataMutationResult.toProto(result));
+ return;
}
- List<Integer> lids = Lists.newArrayList(5);
+ List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
try {
- acquireLock(region, lockKey, lids);
+ acquireLock(region, lockKey, locks);
if (key != lockKey) {
- acquireLock(region, key, lids);
+ acquireLock(region, key, locks);
}
// Load parent table first
PTable parentTable = null;
ImmutableBytesPtr parentCacheKey = null;
if (parentKey != null) {
parentCacheKey = new ImmutableBytesPtr(parentKey);
- parentTable = loadTable(env, parentKey, parentCacheKey, clientTimeStamp, clientTimeStamp);
+ parentTable =
+ loadTable(env, parentKey, parentCacheKey, clientTimeStamp,
+ clientTimeStamp);
if (parentTable == null || isTableDeleted(parentTable)) {
- return new MetaDataMutationResult(MutationCode.PARENT_TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), parentTable);
+ builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ builder.setTable(PTableImpl.toProto(parentTable));
+ done.run(builder.build());
+ return;
}
// If parent table isn't at the expected sequence number, then return
- if (parentTable.getSequenceNumber() != MetaDataUtil.getParentSequenceNumber(tableMetadata)) {
- return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), parentTable);
+ if (parentTable.getSequenceNumber() != MetaDataUtil
+ .getParentSequenceNumber(tableMetadata)) {
+ builder.setReturnCode(MetaDataProtos.MutationCode.CONCURRENT_TABLE_MUTATION);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ builder.setTable(PTableImpl.toProto(parentTable));
+ done.run(builder.build());
+ return;
}
}
// Load child table next
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
- // Get as of latest timestamp so we can detect if we have a newer table that already exists
+ // Get as of latest timestamp so we can detect if we have a newer table that already
+ // exists
// without making an additional query
- PTable table = loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
+ PTable table =
+ loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
if (table != null) {
if (table.getTimeStamp() < clientTimeStamp) {
- // If the table is older than the client time stamp and it's deleted, continue
+ // If the table is older than the client time stamp and it's deleted,
+ // continue
if (!isTableDeleted(table)) {
- return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), table);
+ builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ builder.setTable(PTableImpl.toProto(table));
+ done.run(builder.build());
+ return;
}
} else {
- return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
+ builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ builder.setTable(PTableImpl.toProto(table));
+ done.run(builder.build());
+ return;
}
}
-
- // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the system
+ // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the
+ // system
// table. Basically, we get all the locks that we don't already hold for all the
- // tableMetadata rows. This ensures we don't have deadlock situations (ensuring primary and
- // then index table locks are held, in that order). For now, we just don't support indexing
- // on the system table. This is an issue because of the way we manage batch mutation in the
+ // tableMetadata rows. This ensures we don't have deadlock situations (ensuring
+ // primary and
+ // then index table locks are held, in that order). For now, we just don't support
+ // indexing
+ // on the system table. This is an issue because of the way we manage batch mutation
+ // in the
// Indexer.
- region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
-
+ region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
+
// Invalidate the cache - the next getTable call will add it
- // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
- Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+ // TODO: consider loading the table that was just created here, patching up the
+ // parent table, and updating the cache
+ Map<ImmutableBytesPtr, PTable> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
if (parentCacheKey != null) {
metaDataCache.remove(parentCacheKey);
}
metaDataCache.remove(cacheKey);
// Get timeStamp from mutations - the above method sets it if it's unset
long currentTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
- return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, currentTimeStamp, null);
+ builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
+ builder.setMutationTime(currentTimeStamp);
+ done.run(builder.build());
+ return;
} finally {
- releaseLocks(region, lids);
+ region.releaseRowLocks(locks);
}
} catch (Throwable t) {
- ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
- return null; // impossible
+ logger.error("createTable failed", t);
+ ProtobufUtil.setControllerException(controller,
+ ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
}
}
- private static void acquireLock(HRegion region, byte[] key, List<Integer> lids) throws IOException {
- Integer lid = region.getLock(null, key, true);
- if (lid == null) {
+
+ private static void acquireLock(HRegion region, byte[] key, List<RowLock> locks)
+ throws IOException {
+ RowLock rowLock = region.getRowLock(key);
+ if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
- lids.add(lid);
- }
-
- private static void releaseLocks(HRegion region, List<Integer> lids) {
- for (Integer lid : lids) {
- region.releaseRowLock(lid);
- }
+ locks.add(rowLock);
}
private static final byte[] PHYSICAL_TABLE_BYTES = new byte[] {PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
@@ -610,7 +874,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
RegionScanner scanner = region.getScanner(scan);
try {
- List<KeyValue> results = newArrayList();
+ List<Cell> results = newArrayList();
scanner.next(results);
return results.size() > 0;
}
@@ -620,108 +884,141 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
}
@Override
- public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, String tableType) throws IOException {
+ public void dropTable(RpcController controller, DropTableRequest request,
+ RpcCallback<MetaDataResponse> done) {
+ MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
byte[][] rowKeyMetaData = new byte[3][];
- MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,rowKeyMetaData);
- byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
- byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
- byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
- // Disallow deletion of a system table
- if (tableType.equals(PTableType.SYSTEM.getSerializedValue())) {
- return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
- }
- List<byte[]> tableNamesToDelete = Lists.newArrayList();
+ String tableType = request.getTableType();
+ byte[] schemaName = null;
+ byte[] tableName = null;
+
try {
+ List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
+ MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
+ byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+ schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+ tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+ // Disallow deletion of a system table
+ if (tableType.equals(PTableType.SYSTEM.getSerializedValue())) {
+ builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ done.run(builder.build());
+ return;
+ }
+ List<byte[]> tableNamesToDelete = Lists.newArrayList();
byte[] parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
byte[] lockTableName = parentTableName == null ? tableName : parentTableName;
byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, lockTableName);
- byte[] key = parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
-
- RegionCoprocessorEnvironment env = getEnvironment();
+ byte[] key =
+ parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes,
+ schemaName, tableName);
+
HRegion region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
- return result;
+ done.run(MetaDataMutationResult.toProto(result));
+ return;
}
- List<Integer> lids = Lists.newArrayList(5);
+ List<RowLock> locks = Lists.newArrayList();
try {
- acquireLock(region, lockKey, lids);
+ acquireLock(region, lockKey, locks);
if (key != lockKey) {
- acquireLock(region, key, lids);
+ acquireLock(region, key, locks);
}
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
- result = doDropTable(key, tenantIdBytes, schemaName, tableName, PTableType.fromSerializedValue(tableType), tableMetadata, invalidateList, lids, tableNamesToDelete);
- if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS || result.getTable() == null) {
- return result;
+ result =
+ doDropTable(key, tenantIdBytes, schemaName, tableName,
+ PTableType.fromSerializedValue(tableType), tableMetadata,
+ invalidateList, locks, tableNamesToDelete);
+ if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS
+ || result.getTable() == null) {
+ done.run(MetaDataMutationResult.toProto(result));
+ return;
}
- Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+ Map<ImmutableBytesPtr, PTable> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
// Commit the list of deletion.
- region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
+ region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
- for (ImmutableBytesPtr ckey: invalidateList) {
+ for (ImmutableBytesPtr ckey : invalidateList) {
metaDataCache.put(ckey, newDeletedTableMarker(currentTime));
}
if (parentTableName != null) {
ImmutableBytesPtr parentCacheKey = new ImmutableBytesPtr(lockKey);
metaDataCache.remove(parentCacheKey);
}
- return result;
+ done.run(MetaDataMutationResult.toProto(result));
+ return;
} finally {
- releaseLocks(region, lids);
+ region.releaseRowLocks(locks);
}
} catch (Throwable t) {
- ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
- return null; // impossible
+ logger.error("dropTable failed", t);
+ ProtobufUtil.setControllerException(controller,
+ ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
}
}
- private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName, byte[] tableName, PTableType tableType,
- List<Mutation> rowsToDelete, List<ImmutableBytesPtr> invalidateList, List<Integer> lids, List<byte[]> tableNamesToDelete) throws IOException, SQLException {
+ private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName,
+ byte[] tableName, PTableType tableType, List<Mutation> rowsToDelete,
+ List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
+ List<byte[]> tableNamesToDelete) throws IOException, SQLException {
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);
-
- RegionCoprocessorEnvironment env = getEnvironment();
+
HRegion region = env.getRegion();
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
-
- Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+
+ Map<ImmutableBytesPtr, PTable> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = metaDataCache.get(cacheKey);
-
+
// We always cache the latest version - fault in if not in cache
- if (table != null || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) {
+ if (table != null
+ || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) {
if (table.getTimeStamp() < clientTimeStamp) {
// If the table is older than the client time stamp and its deleted, continue
if (isTableDeleted(table)) {
- return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), null);
+ return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
}
- if ( tableType != table.getType()) {
+ if (tableType != table.getType()) {
// We said to drop a table, but found a view or visa versa
- return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+ return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
}
} else {
- return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
+ return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), table);
}
}
if (table == null && buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) {
- return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+ return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
}
// Get mutations for main table.
Scan scan = newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
RegionScanner scanner = region.getScanner(scan);
- List<KeyValue> results = Lists.newArrayList();
+ List<Cell> results = Lists.newArrayList();
scanner.next(results);
if (results.isEmpty()) {
- return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+ return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
}
- KeyValue typeKeyValue = KeyValueUtil.getColumnLatest(results, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.TABLE_TYPE_BYTES);
- assert(typeKeyValue != null && typeKeyValue.getValueLength() == 1);
- if ( tableType != PTableType.fromSerializedValue(typeKeyValue.getBuffer()[typeKeyValue.getValueOffset()])) {
+ Cell typeKeyValue =
+ KeyValueUtil.getColumnLatest(results, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.TABLE_TYPE_BYTES);
+ assert (typeKeyValue != null && typeKeyValue.getValueLength() == 1);
+ if (tableType != PTableType.fromSerializedValue(typeKeyValue.getValueArray()[typeKeyValue
+ .getValueOffset()])) {
// We said to drop a table, but found a view or visa versa
- return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+ return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
}
// Don't allow a table with views to be deleted
// TODO: support CASCADE with DROP
if (tableType == PTableType.TABLE && hasViews(region, tenantId, table)) {
- return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
+ return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
}
if (table.getType() != PTableType.VIEW) { // Add to list of HTables to delete, unless it's a view
tableNamesToDelete.add(table.getName().getBytes());
@@ -729,141 +1026,178 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
List<byte[]> indexNames = Lists.newArrayList();
invalidateList.add(cacheKey);
byte[][] rowKeyMetaData = new byte[5][];
- byte[] rowKey;
do {
- KeyValue kv = results.get(LINK_TYPE_INDEX);
- rowKey = kv.getRow();
- int nColumns = getVarChars(rowKey, rowKeyMetaData);
- if (nColumns == 5 && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0 && rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX].length > 0
- && Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0
- && LinkType.fromSerializedValue(kv.getBuffer()[kv.getValueOffset()]) == LinkType.INDEX_TABLE) {
+ Cell kv = results.get(LINK_TYPE_INDEX);
+ int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), 0, rowKeyMetaData);
+ if (nColumns == 5
+ && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0
+ && rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX].length > 0
+ && Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0
+ && LinkType.fromSerializedValue(kv.getValueArray()[kv.getValueOffset()]) == LinkType.INDEX_TABLE) {
indexNames.add(rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX]);
}
- @SuppressWarnings("deprecation") // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
+ @SuppressWarnings("deprecation")
+ // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
// FIXME: the version of the Delete constructor without the lock args was introduced
// in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
// of the client.
- Delete delete = new Delete(rowKey, clientTimeStamp, null);
+ Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp);
rowsToDelete.add(delete);
results.clear();
scanner.next(results);
} while (!results.isEmpty());
-
+
// Recursively delete indexes
for (byte[] indexName : indexNames) {
byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName, indexName);
- @SuppressWarnings("deprecation") // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
+ @SuppressWarnings("deprecation")
+ // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
// FIXME: the version of the Delete constructor without the lock args was introduced
// in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
// of the client.
- Delete delete = new Delete(indexKey, clientTimeStamp, null);
+ Delete delete = new Delete(indexKey, clientTimeStamp);
rowsToDelete.add(delete);
- acquireLock(region, indexKey, lids);
- MetaDataMutationResult result = doDropTable(indexKey, tenantId, schemaName, indexName, PTableType.INDEX, rowsToDelete, invalidateList, lids, tableNamesToDelete);
- if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS || result.getTable() == null) {
+ acquireLock(region, indexKey, locks);
+ MetaDataMutationResult result =
+ doDropTable(indexKey, tenantId, schemaName, indexName, PTableType.INDEX,
+ rowsToDelete, invalidateList, locks, tableNamesToDelete);
+ if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS
+ || result.getTable() == null) {
return result;
}
}
-
- return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), table, tableNamesToDelete);
+
+ return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
+ EnvironmentEdgeManager.currentTimeMillis(), table, tableNamesToDelete);
}
private static interface ColumnMutator {
- MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, List<Mutation> tableMetadata, HRegion region, List<ImmutableBytesPtr> invalidateList, List<Integer> lids) throws IOException, SQLException;
+ MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
+ List<Mutation> tableMetadata, HRegion region,
+ List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) throws IOException,
+ SQLException;
}
- private MetaDataMutationResult mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException {
+ private MetaDataMutationResult
+ mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException {
byte[][] rowKeyMetaData = new byte[5][];
- MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,rowKeyMetaData);
+ MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
try {
- RegionCoprocessorEnvironment env = getEnvironment();
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
HRegion region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
- return result;
+ return result;
}
- List<Integer> lids = Lists.newArrayList(5);
+ List<RowLock> locks = Lists.newArrayList();
try {
- acquireLock(region, key, lids);
+ acquireLock(region, key, locks);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
invalidateList.add(cacheKey);
- Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+ Map<ImmutableBytesPtr, PTable> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = metaDataCache.get(cacheKey);
if (logger.isDebugEnabled()) {
if (table == null) {
- logger.debug("Table " + Bytes.toStringBinary(key) + " not found in cache. Will build through scan");
+ logger.debug("Table " + Bytes.toStringBinary(key)
+ + " not found in cache. Will build through scan");
} else {
- logger.debug("Table " + Bytes.toStringBinary(key) + " found in cache with timestamp " + table.getTimeStamp() + " seqNum " + table.getSequenceNumber());
+ logger.debug("Table " + Bytes.toStringBinary(key)
+ + " found in cache with timestamp " + table.getTimeStamp()
+ + " seqNum " + table.getSequenceNumber());
}
}
// Get client timeStamp from mutations
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
- if (table == null && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) {
- // if not found then call newerTableExists and add delete marker for timestamp found
+ if (table == null
+ && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) {
+ // if not found then call newerTableExists and add delete marker for timestamp
+ // found
if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) {
- return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+ return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
}
- return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+ return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
}
if (table.getTimeStamp() >= clientTimeStamp) {
- return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
+ return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), table);
} else if (isTableDeleted(table)) {
- return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+ return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
}
-
- long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup TABLE_SEQ_NUM in tableMetaData
+
+ long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup
+ // TABLE_SEQ_NUM
+ // in
+ // tableMetaData
if (logger.isDebugEnabled()) {
- logger.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum " + expectedSeqNum + " and found seqNum " + table.getSequenceNumber() + " with " + table.getColumns().size() + " columns: " + table.getColumns());
+ logger.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum "
+ + expectedSeqNum + " and found seqNum " + table.getSequenceNumber()
+ + " with " + table.getColumns().size() + " columns: "
+ + table.getColumns());
}
if (expectedSeqNum != table.getSequenceNumber()) {
if (logger.isDebugEnabled()) {
- logger.debug("For table " + Bytes.toStringBinary(key) + " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
+ logger.debug("For table " + Bytes.toStringBinary(key)
+ + " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
}
- return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table);
+ return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
+ EnvironmentEdgeManager.currentTimeMillis(), table);
}
-
+
PTableType type = table.getType();
- if (type == PTableType.INDEX) {
+ if (type == PTableType.INDEX) {
// Disallow mutation of an index table
- return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
+ return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
} else {
- PTableType expectedType = MetaDataUtil.getTableType(tableMetadata);
- // We said to drop a table, but found a view or visa versa
- if (type != expectedType) {
- return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
- }
- if (hasViews(region, tenantId, table)) {
- // Disallow any column mutations for parents of tenant tables
- return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
- }
+ PTableType expectedType = MetaDataUtil.getTableType(tableMetadata);
+ // We said to drop a table, but found a view or visa versa
+ if (type != expectedType) {
+ return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+ }
+ if (hasViews(region, tenantId, table)) {
+ // Disallow any column mutations for parents of tenant tables
+ return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
+ }
}
- result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region, invalidateList, lids);
+ result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region,
+ invalidateList, locks);
if (result != null) {
return result;
}
-
- region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
+
+ region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
// Invalidate from cache
for (ImmutableBytesPtr invalidateKey : invalidateList) {
PTable invalidatedTable = metaDataCache.remove(invalidateKey);
if (logger.isDebugEnabled()) {
if (invalidatedTable == null) {
- logger.debug("Attempted to invalidated table key " + Bytes.toStringBinary(cacheKey.get(),cacheKey.getOffset(),cacheKey.getLength()) + " but found no cached table");
+ logger.debug("Attempted to invalidated table key "
+ + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
+ cacheKey.getLength()) + " but found no cached table");
} else {
- logger.debug("Invalidated table key " + Bytes.toStringBinary(cacheKey.get(),cacheKey.getOffset(),cacheKey.getLength()) + " with timestamp " + invalidatedTable.getTimeStamp() + " and seqNum " + invalidatedTable.getSequenceNumber());
+ logger.debug("Invalidated table key "
+ + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
+ cacheKey.getLength()) + " with timestamp "
+ + invalidatedTable.getTimeStamp() + " and seqNum "
+ + invalidatedTable.getSequenceNumber());
}
}
}
- // Get client timeStamp from mutations, since it may get updated by the mutateRowsWithLocks call
+ // Get client timeStamp from mutations, since it may get updated by the
+ // mutateRowsWithLocks call
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
- return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null);
+ return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime,
+ null);
} finally {
- releaseLocks(region,lids);
+ region.releaseRowLocks(locks);
}
} catch (Throwable t) {
ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
@@ -871,261 +1205,251 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
}
}
+
@Override
- public MetaDataMutationResult addColumn(List<Mutation> tableMetaData) throws IOException {
- return mutateColumn(tableMetaData, new ColumnMutator() {
- @Override
- public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, List<Mutation> tableMetaData, HRegion region, List<ImmutableBytesPtr> invalidateList, List<Integer> lids) {
- byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
- byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
- byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
- for (Mutation m : tableMetaData) {
- byte[] key = m.getRow();
- boolean addingPKColumn = false;
- int pkCount = getVarChars(key, rowKeyMetaData);
- if (pkCount > COLUMN_NAME_INDEX
- && Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
- && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0 ) {
- try {
- if (pkCount > FAMILY_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
- PColumnFamily family = table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
- family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
- } else if (pkCount > COLUMN_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
- addingPKColumn = true;
- table.getPKColumn(new String(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
- } else {
- continue;
- }
- return new MetaDataMutationResult(MutationCode.COLUMN_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), table);
- } catch (ColumnFamilyNotFoundException e) {
- continue;
- } catch (ColumnNotFoundException e) {
- if (addingPKColumn) {
- // Add all indexes to invalidate list, as they will all be adding the same PK column
- // No need to lock them, as we have the parent table lock at this point
- for (PTable index : table.getIndexes()) {
- invalidateList.add(new ImmutableBytesPtr(SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(),index.getTableName().getBytes())));
- }
- }
- continue;
- }
- }
- }
- return null;
- }
- });
- }
-
- @Override
- public MetaDataMutationResult dropColumn(List<Mutation> tableMetaData) throws IOException {
- final long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
- final List<byte[]> tableNamesToDelete = Lists.newArrayList();
- return mutateColumn(tableMetaData, new ColumnMutator() {
- @SuppressWarnings("deprecation")
- @Override
- public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, List<Mutation> tableMetaData, HRegion region, List<ImmutableBytesPtr> invalidateList, List<Integer> lids) throws IOException, SQLException {
- byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
- byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
- byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
- boolean deletePKColumn = false;
- List<Mutation> additionalTableMetaData = Lists.newArrayList();
- for (Mutation m : tableMetaData) {
- if (m instanceof Delete) {
+ public void addColumn(RpcController controller, AddColumnRequest request,
+ RpcCallback<MetaDataResponse> done) {
+ try {
+ List<Mutation> tableMetaData = ProtobufUtil.getMutations(request);
+ MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
+ @Override
+ public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
+ List<Mutation> tableMetaData, HRegion region,
+ List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) {
+ byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
+ byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
+ byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
+ for (Mutation m : tableMetaData) {
byte[] key = m.getRow();
+ boolean addingPKColumn = false;
int pkCount = getVarChars(key, rowKeyMetaData);
- if (pkCount > COLUMN_NAME_INDEX
- && Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
- && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0 ) {
- PColumn columnToDelete = null;
+ if (pkCount > COLUMN_NAME_INDEX
+ && Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
+ && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
try {
- if (pkCount > FAMILY_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
- PColumnFamily family = table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
- columnToDelete = family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
- } else if (pkCount > COLUMN_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
- deletePKColumn = true;
- columnToDelete = table.getPKColumn(new String(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
+ if (pkCount > FAMILY_NAME_INDEX
+ && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
+ PColumnFamily family =
+ table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
+ family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+ } else if (pkCount > COLUMN_NAME_INDEX
+ && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
+ addingPKColumn = true;
+ table.getPKColumn(new String(
+ rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
} else {
continue;
}
- // Look for columnToDelete in any indexes. If found as PK column, get lock and drop the index. If found as covered column, delete from index (do this client side?).
- // In either case, invalidate index if the column is in it
-
<TRUNCATED>