You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/07/20 19:14:48 UTC
[18/50] [abbrv] phoenix git commit: PHOENIX-1954 Reserve chunks of
numbers for a sequence (Jan Fernando)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 69520b0..a9f455a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -135,6 +135,7 @@ import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
@@ -2272,8 +2273,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
* Verifies that sequences exist and reserves values for them if reserveValues is true
*/
@Override
- public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
- incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, action);
+ public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
+ incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, action);
}
/**
@@ -2286,14 +2287,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
*
*/
@Override
- public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException {
- incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE);
+ public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions) throws SQLException {
+ incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE);
}
@SuppressWarnings("deprecation")
- private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException {
- List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size());
- for (SequenceKey key : keys) {
+ private void incrementSequenceValues(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException {
+ List<Sequence> sequences = Lists.newArrayListWithExpectedSize(sequenceAllocations.size());
+ for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
+ SequenceKey key = sequenceAllocation.getSequenceKey();
Sequence newSequences = new Sequence(key);
Sequence sequence = sequenceMap.putIfAbsent(key, newSequences);
if (sequence == null) {
@@ -2312,11 +2314,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
for (int i = 0; i < sequences.size(); i++) {
Sequence sequence = sequences.get(i);
try {
- values[i] = sequence.incrementValue(timestamp, op);
+ values[i] = sequence.incrementValue(timestamp, op, sequenceAllocations.get(i).getNumAllocations());
} catch (EmptySequenceCacheException e) {
indexes[toIncrementList.size()] = i;
toIncrementList.add(sequence);
- Increment inc = sequence.newIncrement(timestamp, op);
+ Increment inc = sequence.newIncrement(timestamp, op, sequenceAllocations.get(i).getNumAllocations());
incrementBatch.add(inc);
} catch (SQLException e) {
exceptions[i] = e;
@@ -2355,7 +2357,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
Sequence sequence = toIncrementList.get(i);
Result result = (Result)resultObjects[i];
try {
- values[indexes[i]] = sequence.incrementValue(result, op);
+ long numToAllocate = Bytes.toLong(incrementBatch.get(i).getAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE));
+ values[indexes[i]] = sequence.incrementValue(result, op, numToAllocate);
} catch (SQLException e) {
exceptions[indexes[i]] = e;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 4d582be..3fa0c1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -66,6 +66,7 @@ import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceAlreadyExistsException;
import org.apache.phoenix.schema.SequenceInfo;
import org.apache.phoenix.schema.SequenceKey;
@@ -385,13 +386,13 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
- public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values,
- SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
+ public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp,
+ long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
int i = 0;
- for (SequenceKey key : sequenceKeys) {
- SequenceInfo info = sequenceMap.get(key);
+ for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
+ SequenceInfo info = sequenceMap.get(sequenceAllocation.getSequenceKey());
if (info == null) {
- exceptions[i] = new SequenceNotFoundException(key.getSchemaName(), key.getSequenceName());
+ exceptions[i] = new SequenceNotFoundException(sequenceAllocation.getSequenceKey().getSchemaName(), sequenceAllocation.getSequenceKey().getSequenceName());
} else {
values[i] = info.sequenceValue;
}
@@ -400,10 +401,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
- public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values,
- SQLException[] exceptions) throws SQLException {
+ public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp,
+ long[] values, SQLException[] exceptions) throws SQLException {
int i = 0;
- for (SequenceKey key : sequenceKeys) {
+ for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
+ SequenceKey key = sequenceAllocation.getSequenceKey();
SequenceInfo info = sequenceMap.get(key);
if (info == null) {
exceptions[i] = new SequenceNotFoundException(
@@ -429,7 +431,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
i = 0;
for (SQLException e : exceptions) {
if (e != null) {
- sequenceMap.remove(sequenceKeys.get(i));
+ sequenceMap.remove(sequenceAllocations.get(i).getSequenceKey());
}
i++;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 2a98cd5..4153652 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.stats.PTableStats;
@@ -183,15 +184,15 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
}
@Override
- public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values,
- SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
- getDelegate().validateSequences(sequenceKeys, timestamp, values, exceptions, action);
+ public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp,
+ long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
+ getDelegate().validateSequences(sequenceAllocations, timestamp, values, exceptions, action);
}
@Override
- public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values,
- SQLException[] exceptions) throws SQLException {
- getDelegate().incrementSequences(sequenceKeys, timestamp, values, exceptions);
+ public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp,
+ long[] values, SQLException[] exceptions) throws SQLException {
+ getDelegate().incrementSequences(sequenceAllocations, timestamp, values, exceptions);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 9e74d2a..9d0c1aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1255,7 +1255,7 @@ public class MetaDataClient {
dataTable.getTimeStamp());
long[] seqValues = new long[1];
SQLException[] sqlExceptions = new SQLException[1];
- connection.getQueryServices().incrementSequences(Collections.singletonList(key),
+ connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
Math.max(timestamp, dataTable.getTimeStamp()), seqValues, sqlExceptions);
if (sqlExceptions[0] != null) {
throw sqlExceptions[0];
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index aeba58b..adca5e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -62,7 +62,7 @@ import com.google.common.math.LongMath;
public class Sequence {
public static final int SUCCESS = 0;
- public enum ValueOp {VALIDATE_SEQUENCE, RESERVE_SEQUENCE, INCREMENT_SEQUENCE};
+ public enum ValueOp {VALIDATE_SEQUENCE, INCREMENT_SEQUENCE};
public enum MetaOp {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE};
// create empty Sequence key values used while created a sequence row
@@ -144,7 +144,7 @@ public class Sequence {
return value.isDeleted ? null : value;
}
- private long increment(SequenceValue value, ValueOp op) throws SQLException {
+ private long increment(SequenceValue value, ValueOp op, long numToAllocate) throws SQLException {
boolean increasingSeq = value.incrementBy > 0 && op != ValueOp.VALIDATE_SEQUENCE;
// check if the the sequence has already reached the min/max limit
if (value.limitReached && op != ValueOp.VALIDATE_SEQUENCE) {
@@ -165,7 +165,8 @@ public class Sequence {
boolean overflowOrUnderflow=false;
// advance currentValue while checking for overflow
try {
- value.currentValue = LongMath.checkedAdd(value.currentValue, value.incrementBy);
+ // advance by numToAllocate * the increment amount
+ value.currentValue = LongMath.checkedAdd(value.currentValue, numToAllocate * value.incrementBy);
} catch (ArithmeticException e) {
overflowOrUnderflow = true;
}
@@ -180,18 +181,92 @@ public class Sequence {
return returnValue;
}
- public long incrementValue(long timestamp, ValueOp op) throws SQLException {
+ public long incrementValue(long timestamp, ValueOp op, long numToAllocate) throws SQLException {
SequenceValue value = findSequenceValue(timestamp);
if (value == null) {
throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
}
- if (value.currentValue == value.nextValue) {
+
+ if (isSequenceCacheExhausted(numToAllocate, value)) {
if (op == ValueOp.VALIDATE_SEQUENCE) {
return value.currentValue;
}
throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
- }
- return increment(value, op);
+ }
+ return increment(value, op, numToAllocate);
+ }
+
+ /**
+ * This method first checks whether value.currentValue = value.nextValue, this check is what
+ * determines whether we need to refresh the cache when evaluating NEXT VALUE FOR. Once
+ * current value reaches the next value we know the cache is exhausted as we give sequence
+ * values out one at time.
+ *
+ * However for bulk allocations, evaluated by NEXT <n> VALUE FOR, we need a different check
+ * @see isSequenceCacheExhaustedForBulkAllocation
+ *
+ * Using the bulk allocation method for determining if the cache is exhausted for both cases
+ * works in most of the cases, however when dealing with CYCLEs and overflow and underflow, things
+ * break down due to things like sign changes that can happen if we overflow from a positive to
+ * a negative number and vice versa. Therefore, leaving both checks in place.
+ *
+ */
+ private boolean isSequenceCacheExhausted(final long numToAllocate, final SequenceValue value) throws SQLException {
+ return value.currentValue == value.nextValue || (SequenceUtil.isBulkAllocation(numToAllocate) && isSequenceCacheExhaustedForBulkAllocation(numToAllocate, value));
+ }
+
+ /**
+ * This method checks whether there are sufficient values in the SequenceValue
+ * cached on the client to allocate the requested number of slots. It handles
+ * decreasing and increasing sequences as well as any overflows or underflows
+ * encountered.
+ */
+ private boolean isSequenceCacheExhaustedForBulkAllocation(final long numToAllocate, final SequenceValue value) throws SQLException {
+ long targetSequenceValue;
+
+ performValidationForBulkAllocation(numToAllocate, value);
+
+ try {
+ targetSequenceValue = LongMath.checkedAdd(value.currentValue, numToAllocate * value.incrementBy);
+ } catch (ArithmeticException e) {
+ // Perform a CheckedAdd to make sure if over/underflow
+ // We don't treat this as the cache being exhausted as the current value may be valid in the case
+ // of no cycle, logic in increment() will take care of detecting we've hit the limit of the sequence
+ return false;
+ }
+
+ if (value.incrementBy > 0) {
+ return targetSequenceValue > value.nextValue;
+ } else {
+ return targetSequenceValue < value.nextValue;
+ }
+ }
+
+ /**
+ * @throws SQLException with the correct error code if sequence limit is reached with
+ * this request for allocation or we attempt to perform a bulk allocation on a sequence
+ * with cycles.
+ */
+ private void performValidationForBulkAllocation(final long numToAllocate, final SequenceValue value)
+ throws SQLException {
+ boolean increasingSeq = value.incrementBy > 0 ? true : false;
+
+ // We don't support Bulk Allocations on sequences that have the CYCLE flag set to true
+ // Check for this here so we fail on expression evaluation and don't allow corner case
+ // whereby a client requests less than cached number of slots on sequence with cycle to succeed
+ if (value.cycle && !SequenceUtil.isCycleAllowed(numToAllocate)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED)
+ .setSchemaName(key.getSchemaName())
+ .setTableName(key.getSequenceName())
+ .build().buildException();
+ }
+
+ if (SequenceUtil.checkIfLimitReached(value.currentValue, value.minValue, value.maxValue, value.incrementBy, value.cacheSize, numToAllocate)) {
+ throw new SQLExceptionInfo.Builder(SequenceUtil.getLimitReachedErrorCode(increasingSeq))
+ .setSchemaName(key.getSchemaName())
+ .setTableName(key.getSequenceName())
+ .build().buildException();
+ }
}
public List<Append> newReturns() {
@@ -249,7 +324,7 @@ public class Sequence {
return key;
}
- public long incrementValue(Result result, ValueOp op) throws SQLException {
+ public long incrementValue(Result result, ValueOp op, long numToAllocate) throws SQLException {
// In this case, we don't definitely know the timestamp of the deleted sequence,
// but we know anything older is likely deleted. Worse case, we remove a sequence
// from the cache that we shouldn't have which will cause a gap in sequence values.
@@ -270,19 +345,21 @@ public class Sequence {
.build().buildException();
}
// If we found the sequence, we update our cache with the new value
- SequenceValue value = new SequenceValue(result, op);
+ SequenceValue value = new SequenceValue(result, op, numToAllocate);
insertSequenceValue(value);
- return increment(value, op);
+ return increment(value, op, numToAllocate);
}
+
@SuppressWarnings("deprecation")
- public Increment newIncrement(long timestamp, Sequence.ValueOp action) {
+ public Increment newIncrement(long timestamp, Sequence.ValueOp action, long numToAllocate) {
Increment inc = new Increment(key.getKey());
// It doesn't matter what we set the amount too - we always use the values we get
// from the Get we do to prevent any race conditions. All columns that get added
// are returned with their current value
try {
inc.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, timestamp);
+ inc.setAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE, Bytes.toBytes(numToAllocate));
} catch (IOException e) {
throw new RuntimeException(e); // Impossible
}
@@ -413,7 +490,7 @@ public class Sequence {
return this.incrementBy == 0;
}
- public SequenceValue(Result r, ValueOp op) {
+ public SequenceValue(Result r, ValueOp op, long numToAllocate) {
KeyValue currentValueKV = getCurrentValueKV(r);
KeyValue incrementByKV = getIncrementByKV(r);
KeyValue cacheSizeKV = getCacheSizeKV(r);
@@ -429,8 +506,12 @@ public class Sequence {
this.cycle = (Boolean) PBoolean.INSTANCE.toObject(cycleKV.getValueArray(), cycleKV.getValueOffset(), cycleKV.getValueLength());
this.limitReached = false;
currentValue = nextValue;
+
if (op != ValueOp.VALIDATE_SEQUENCE) {
- currentValue -= incrementBy * cacheSize;
+ // We can't just take the max of numToAllocate and cacheSize
+ // We need to handle a valid edgecase where a client requests bulk allocation of
+ // a number of slots that are less than cache size of the sequence
+ currentValue -= incrementBy * (SequenceUtil.isBulkAllocation(numToAllocate) ? numToAllocate : cacheSize);
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java
new file mode 100644
index 0000000..afb4a20
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java
@@ -0,0 +1,48 @@
+package org.apache.phoenix.schema;
+
+/**
+ * A SequenceKey and the number of slots requested to be allocated for the sequence.
+ * It binds these two together to allow operations such as sorting
+ * a Collection of SequenceKeys and at the same time preserving the associated requested
+ * number of slots to allocate.
+ *
+ * This class delegates hashCode, equals and compareTo to @see{SequenceKey}.
+ *
+ */
+public class SequenceAllocation implements Comparable<SequenceAllocation> {
+
+ private final SequenceKey sequenceKey;
+ private final long numAllocations;
+
+ public SequenceAllocation(SequenceKey sequenceKey, long numAllocations) {
+ this.sequenceKey = sequenceKey;
+ this.numAllocations = numAllocations;
+ }
+
+
+ public SequenceKey getSequenceKey() {
+ return sequenceKey;
+ }
+
+
+ public long getNumAllocations() {
+ return numAllocations;
+ }
+
+
+ @Override
+ public int hashCode() {
+ return sequenceKey.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return sequenceKey.equals(obj);
+ }
+
+ @Override
+ public int compareTo(SequenceAllocation that) {
+ return sequenceKey.compareTo(that.sequenceKey);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
index f97d565..acf1864 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
@@ -16,6 +16,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.schema.SequenceInfo;
+import com.google.common.base.Preconditions;
import com.google.common.math.LongMath;
/**
@@ -23,17 +24,24 @@ import com.google.common.math.LongMath;
*/
public class SequenceUtil {
+ public static final long DEFAULT_NUM_SLOTS_TO_ALLOCATE = 1L;
+
/**
- * Returns the nextValue of a sequence
- * @throws SQLException if cycle is false and the sequence limit has been reached
+ * @return true if we limit of a sequence has been reached.
*/
public static boolean checkIfLimitReached(long currentValue, long minValue, long maxValue,
- long incrementBy, long cacheSize) throws SQLException {
+ long incrementBy, long cacheSize, long numToAllocate) {
long nextValue = 0;
boolean increasingSeq = incrementBy > 0 ? true : false;
// advance currentValue while checking for overflow
try {
- long incrementValue = LongMath.checkedMultiply(incrementBy, cacheSize);
+ long incrementValue;
+ if (isBulkAllocation(numToAllocate)) {
+ // For bulk allocation we increment independent of cache size
+ incrementValue = LongMath.checkedMultiply(incrementBy, numToAllocate);
+ } else {
+ incrementValue = LongMath.checkedMultiply(incrementBy, cacheSize);
+ }
nextValue = LongMath.checkedAdd(currentValue, incrementValue);
} catch (ArithmeticException e) {
return true;
@@ -46,9 +54,28 @@ public class SequenceUtil {
}
return false;
}
+
+ public static boolean checkIfLimitReached(long currentValue, long minValue, long maxValue,
+ long incrementBy, long cacheSize) throws SQLException {
+ return checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, DEFAULT_NUM_SLOTS_TO_ALLOCATE);
+ }
public static boolean checkIfLimitReached(SequenceInfo info) throws SQLException {
- return checkIfLimitReached(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy, info.cacheSize);
+ return checkIfLimitReached(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy, info.cacheSize, DEFAULT_NUM_SLOTS_TO_ALLOCATE);
+ }
+
+ /**
+ * Returns true if the value of numToAllocate signals that a bulk allocation of sequence slots
+ * was requested. Prevents proliferation of same comparison in many places throughout the code.
+ */
+ public static boolean isBulkAllocation(long numToAllocate) {
+ Preconditions.checkArgument(numToAllocate > 0);
+ return numToAllocate > DEFAULT_NUM_SLOTS_TO_ALLOCATE;
+ }
+
+ public static boolean isCycleAllowed(long numToAllocate) {
+ return !isBulkAllocation(numToAllocate);
+
}
/**
@@ -59,5 +86,15 @@ public class SequenceUtil {
return new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
+
+ /**
+ * Returns the correct instance of SQLExceptionCode when we detect a limit has been reached,
+ * depending upon whether a min or max value caused the limit to be exceeded.
+ */
+ public static SQLExceptionCode getLimitReachedErrorCode(boolean increasingSeq) {
+ SQLExceptionCode code = increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
+ : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
+ return code;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java
new file mode 100644
index 0000000..4a825f2
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java
@@ -0,0 +1,59 @@
+package org.apache.phoenix.schema;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SequenceAllocationTest {
+
+ @Test
+ /**
+ * Validates that sorting a List of SequenceAllocation instances
+ * results in the same sort order as sorting SequenceKey instances.
+ */
+ public void testSortingSequenceAllocation() {
+
+ // Arrange
+ SequenceKey sequenceKey1 = new SequenceKey(null, "seqalloc", "sequenceC",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ SequenceKey sequenceKey2 = new SequenceKey(null, "seqalloc", "sequenceB",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ SequenceKey sequenceKey3 = new SequenceKey(null, "seqalloc", "sequenceA",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ List<SequenceKey> sequenceKeys = Lists.newArrayList(sequenceKey1, sequenceKey2, sequenceKey3);
+ List<SequenceAllocation> sequenceAllocations = Lists.newArrayList(new SequenceAllocation(sequenceKey2, 1), new SequenceAllocation(sequenceKey1, 1), new SequenceAllocation(sequenceKey3, 1));
+
+ // Act
+ Collections.sort(sequenceKeys);
+ Collections.sort(sequenceAllocations);
+
+ // Assert
+ int i = 0;
+ for (SequenceKey sequenceKey : sequenceKeys) {
+ assertEquals(sequenceKey, sequenceAllocations.get(i).getSequenceKey());
+ i++;
+ }
+ }
+
+ @Test
+ public void testSortingSequenceAllocationPreservesAllocations() {
+
+ // Arrange
+ SequenceKey sequenceKeyC = new SequenceKey(null, "seqalloc", "sequenceC",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ SequenceKey sequenceKeyB = new SequenceKey(null, "seqalloc", "sequenceB",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ SequenceKey sequenceKeyA = new SequenceKey(null, "seqalloc", "sequenceA",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ List<SequenceAllocation> sequenceAllocations = Lists.newArrayList(new SequenceAllocation(sequenceKeyB, 15), new SequenceAllocation(sequenceKeyC, 11), new SequenceAllocation(sequenceKeyA, 1000));
+
+ // Act
+ Collections.sort(sequenceAllocations);
+
+ // Assert
+ assertEquals("sequenceA",sequenceAllocations.get(0).getSequenceKey().getSequenceName());
+ assertEquals(1000,sequenceAllocations.get(0).getNumAllocations());
+ assertEquals(15,sequenceAllocations.get(1).getNumAllocations());
+ assertEquals(11,sequenceAllocations.get(2).getNumAllocations());
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
index f25a213..2abc482 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
@@ -62,4 +62,58 @@ public class SequenceUtilTest {
public void testDescendingOverflowCycle() throws SQLException {
assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE));
}
+
+ @Test
+ public void testBulkAllocationAscendingNextValueGreaterThanMax() throws SQLException {
+ assertTrue(SequenceUtil.checkIfLimitReached(MAX_VALUE, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, 1));
+ }
+
+ @Test
+ public void testBulkAllocationAscendingNextValueReachLimit() throws SQLException {
+ assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, 2));
+ }
+
+ @Test
+ public void testBulkAllocationAscendingNextValueWithinLimit() throws SQLException {
+ assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, 2));
+
+ }
+
+ @Test
+ public void testBulkAllocationAscendingOverflow() throws SQLException {
+ assertTrue(SequenceUtil.checkIfLimitReached(Long.MAX_VALUE, 0, Long.MAX_VALUE, 1/* incrementBy */, CACHE_SIZE, 100));
+ }
+
+
+ @Test
+ public void testBulkAllocationDescendingNextValueLessThanMax() throws SQLException {
+ assertTrue(SequenceUtil.checkIfLimitReached(10, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, 5));
+ }
+
+ @Test
+ public void testBulkAllocationDescendingNextValueReachLimit() throws SQLException {
+ assertFalse(SequenceUtil.checkIfLimitReached(7, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, 3));
+ }
+
+ @Test
+ public void testBulkAllocationDescendingNextValueWithinLimit() throws SQLException {
+ assertFalse(SequenceUtil.checkIfLimitReached(8, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, 2));
+
+ }
+
+ @Test
+ public void testBulkAllocationDescendingOverflowCycle() throws SQLException {
+ assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE, 100));
+ }
+
+ @Test
+ public void testIsCycleAllowedForBulkAllocation() {
+ assertFalse(SequenceUtil.isCycleAllowed(2));
+ }
+
+ @Test
+ public void testIsCycleAllowedForStandardAllocation() {
+ assertTrue(SequenceUtil.isCycleAllowed(1));
+ }
+
}