You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/07/10 05:56:58 UTC
[1/2] phoenix git commit: PHOENIX-1954 Reserve chunks of numbers for
a sequence (Jan Fernando)
Repository: phoenix
Updated Branches:
refs/heads/master 984e62223 -> 3b1bfa0d7
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 69520b0..a9f455a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -135,6 +135,7 @@ import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
@@ -2272,8 +2273,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
* Verifies that sequences exist and reserves values for them if reserveValues is true
*/
@Override
- public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
- incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, action);
+ public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
+ incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, action);
}
/**
@@ -2286,14 +2287,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
*
*/
@Override
- public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException {
- incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE);
+ public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions) throws SQLException {
+ incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE);
}
@SuppressWarnings("deprecation")
- private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException {
- List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size());
- for (SequenceKey key : keys) {
+ private void incrementSequenceValues(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException {
+ List<Sequence> sequences = Lists.newArrayListWithExpectedSize(sequenceAllocations.size());
+ for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
+ SequenceKey key = sequenceAllocation.getSequenceKey();
Sequence newSequences = new Sequence(key);
Sequence sequence = sequenceMap.putIfAbsent(key, newSequences);
if (sequence == null) {
@@ -2312,11 +2314,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
for (int i = 0; i < sequences.size(); i++) {
Sequence sequence = sequences.get(i);
try {
- values[i] = sequence.incrementValue(timestamp, op);
+ values[i] = sequence.incrementValue(timestamp, op, sequenceAllocations.get(i).getNumAllocations());
} catch (EmptySequenceCacheException e) {
indexes[toIncrementList.size()] = i;
toIncrementList.add(sequence);
- Increment inc = sequence.newIncrement(timestamp, op);
+ Increment inc = sequence.newIncrement(timestamp, op, sequenceAllocations.get(i).getNumAllocations());
incrementBatch.add(inc);
} catch (SQLException e) {
exceptions[i] = e;
@@ -2355,7 +2357,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
Sequence sequence = toIncrementList.get(i);
Result result = (Result)resultObjects[i];
try {
- values[indexes[i]] = sequence.incrementValue(result, op);
+ long numToAllocate = Bytes.toLong(incrementBatch.get(i).getAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE));
+ values[indexes[i]] = sequence.incrementValue(result, op, numToAllocate);
} catch (SQLException e) {
exceptions[indexes[i]] = e;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 4d582be..3fa0c1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -66,6 +66,7 @@ import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceAlreadyExistsException;
import org.apache.phoenix.schema.SequenceInfo;
import org.apache.phoenix.schema.SequenceKey;
@@ -385,13 +386,13 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
- public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values,
- SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
+ public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp,
+ long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
int i = 0;
- for (SequenceKey key : sequenceKeys) {
- SequenceInfo info = sequenceMap.get(key);
+ for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
+ SequenceInfo info = sequenceMap.get(sequenceAllocation.getSequenceKey());
if (info == null) {
- exceptions[i] = new SequenceNotFoundException(key.getSchemaName(), key.getSequenceName());
+ exceptions[i] = new SequenceNotFoundException(sequenceAllocation.getSequenceKey().getSchemaName(), sequenceAllocation.getSequenceKey().getSequenceName());
} else {
values[i] = info.sequenceValue;
}
@@ -400,10 +401,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
- public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values,
- SQLException[] exceptions) throws SQLException {
+ public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp,
+ long[] values, SQLException[] exceptions) throws SQLException {
int i = 0;
- for (SequenceKey key : sequenceKeys) {
+ for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
+ SequenceKey key = sequenceAllocation.getSequenceKey();
SequenceInfo info = sequenceMap.get(key);
if (info == null) {
exceptions[i] = new SequenceNotFoundException(
@@ -429,7 +431,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
i = 0;
for (SQLException e : exceptions) {
if (e != null) {
- sequenceMap.remove(sequenceKeys.get(i));
+ sequenceMap.remove(sequenceAllocations.get(i).getSequenceKey());
}
i++;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 2a98cd5..4153652 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.stats.PTableStats;
@@ -183,15 +184,15 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
}
@Override
- public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values,
- SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
- getDelegate().validateSequences(sequenceKeys, timestamp, values, exceptions, action);
+ public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp,
+ long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
+ getDelegate().validateSequences(sequenceAllocations, timestamp, values, exceptions, action);
}
@Override
- public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values,
- SQLException[] exceptions) throws SQLException {
- getDelegate().incrementSequences(sequenceKeys, timestamp, values, exceptions);
+ public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp,
+ long[] values, SQLException[] exceptions) throws SQLException {
+ getDelegate().incrementSequences(sequenceAllocations, timestamp, values, exceptions);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 9e74d2a..9d0c1aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1255,7 +1255,7 @@ public class MetaDataClient {
dataTable.getTimeStamp());
long[] seqValues = new long[1];
SQLException[] sqlExceptions = new SQLException[1];
- connection.getQueryServices().incrementSequences(Collections.singletonList(key),
+ connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
Math.max(timestamp, dataTable.getTimeStamp()), seqValues, sqlExceptions);
if (sqlExceptions[0] != null) {
throw sqlExceptions[0];
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index aeba58b..adca5e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -62,7 +62,7 @@ import com.google.common.math.LongMath;
public class Sequence {
public static final int SUCCESS = 0;
- public enum ValueOp {VALIDATE_SEQUENCE, RESERVE_SEQUENCE, INCREMENT_SEQUENCE};
+ public enum ValueOp {VALIDATE_SEQUENCE, INCREMENT_SEQUENCE};
public enum MetaOp {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE};
// create empty Sequence key values used while created a sequence row
@@ -144,7 +144,7 @@ public class Sequence {
return value.isDeleted ? null : value;
}
- private long increment(SequenceValue value, ValueOp op) throws SQLException {
+ private long increment(SequenceValue value, ValueOp op, long numToAllocate) throws SQLException {
boolean increasingSeq = value.incrementBy > 0 && op != ValueOp.VALIDATE_SEQUENCE;
// check if the the sequence has already reached the min/max limit
if (value.limitReached && op != ValueOp.VALIDATE_SEQUENCE) {
@@ -165,7 +165,8 @@ public class Sequence {
boolean overflowOrUnderflow=false;
// advance currentValue while checking for overflow
try {
- value.currentValue = LongMath.checkedAdd(value.currentValue, value.incrementBy);
+ // advance by numToAllocate * the increment amount
+ value.currentValue = LongMath.checkedAdd(value.currentValue, numToAllocate * value.incrementBy);
} catch (ArithmeticException e) {
overflowOrUnderflow = true;
}
@@ -180,18 +181,92 @@ public class Sequence {
return returnValue;
}
- public long incrementValue(long timestamp, ValueOp op) throws SQLException {
+ public long incrementValue(long timestamp, ValueOp op, long numToAllocate) throws SQLException {
SequenceValue value = findSequenceValue(timestamp);
if (value == null) {
throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
}
- if (value.currentValue == value.nextValue) {
+
+ if (isSequenceCacheExhausted(numToAllocate, value)) {
if (op == ValueOp.VALIDATE_SEQUENCE) {
return value.currentValue;
}
throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
- }
- return increment(value, op);
+ }
+ return increment(value, op, numToAllocate);
+ }
+
+ /**
+ * This method first checks whether value.currentValue = value.nextValue, this check is what
+ * determines whether we need to refresh the cache when evaluating NEXT VALUE FOR. Once
+ * current value reaches the next value we know the cache is exhausted as we give sequence
+ * values out one at time.
+ *
+ * However for bulk allocations, evaluated by NEXT <n> VALUE FOR, we need a different check
+ * @see isSequenceCacheExhaustedForBulkAllocation
+ *
+ * Using the bulk allocation method for determining if the cache is exhausted for both cases
+ * works in most of the cases, however when dealing with CYCLEs and overflow and underflow, things
+ * break down due to things like sign changes that can happen if we overflow from a positive to
+ * a negative number and vice versa. Therefore, leaving both checks in place.
+ *
+ */
+ private boolean isSequenceCacheExhausted(final long numToAllocate, final SequenceValue value) throws SQLException {
+ return value.currentValue == value.nextValue || (SequenceUtil.isBulkAllocation(numToAllocate) && isSequenceCacheExhaustedForBulkAllocation(numToAllocate, value));
+ }
+
+ /**
+ * This method checks whether there are sufficient values in the SequenceValue
+ * cached on the client to allocate the requested number of slots. It handles
+ * decreasing and increasing sequences as well as any overflows or underflows
+ * encountered.
+ */
+ private boolean isSequenceCacheExhaustedForBulkAllocation(final long numToAllocate, final SequenceValue value) throws SQLException {
+ long targetSequenceValue;
+
+ performValidationForBulkAllocation(numToAllocate, value);
+
+ try {
+ targetSequenceValue = LongMath.checkedAdd(value.currentValue, numToAllocate * value.incrementBy);
+ } catch (ArithmeticException e) {
+ // Perform a CheckedAdd to make sure if over/underflow
+ // We don't treat this as the cache being exhausted as the current value may be valid in the case
+ // of no cycle, logic in increment() will take care of detecting we've hit the limit of the sequence
+ return false;
+ }
+
+ if (value.incrementBy > 0) {
+ return targetSequenceValue > value.nextValue;
+ } else {
+ return targetSequenceValue < value.nextValue;
+ }
+ }
+
+ /**
+ * @throws SQLException with the correct error code if sequence limit is reached with
+ * this request for allocation or we attempt to perform a bulk allocation on a sequence
+ * with cycles.
+ */
+ private void performValidationForBulkAllocation(final long numToAllocate, final SequenceValue value)
+ throws SQLException {
+ boolean increasingSeq = value.incrementBy > 0 ? true : false;
+
+ // We don't support Bulk Allocations on sequences that have the CYCLE flag set to true
+ // Check for this here so we fail on expression evaluation and don't allow corner case
+ // whereby a client requests less than cached number of slots on sequence with cycle to succeed
+ if (value.cycle && !SequenceUtil.isCycleAllowed(numToAllocate)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED)
+ .setSchemaName(key.getSchemaName())
+ .setTableName(key.getSequenceName())
+ .build().buildException();
+ }
+
+ if (SequenceUtil.checkIfLimitReached(value.currentValue, value.minValue, value.maxValue, value.incrementBy, value.cacheSize, numToAllocate)) {
+ throw new SQLExceptionInfo.Builder(SequenceUtil.getLimitReachedErrorCode(increasingSeq))
+ .setSchemaName(key.getSchemaName())
+ .setTableName(key.getSequenceName())
+ .build().buildException();
+ }
}
public List<Append> newReturns() {
@@ -249,7 +324,7 @@ public class Sequence {
return key;
}
- public long incrementValue(Result result, ValueOp op) throws SQLException {
+ public long incrementValue(Result result, ValueOp op, long numToAllocate) throws SQLException {
// In this case, we don't definitely know the timestamp of the deleted sequence,
// but we know anything older is likely deleted. Worse case, we remove a sequence
// from the cache that we shouldn't have which will cause a gap in sequence values.
@@ -270,19 +345,21 @@ public class Sequence {
.build().buildException();
}
// If we found the sequence, we update our cache with the new value
- SequenceValue value = new SequenceValue(result, op);
+ SequenceValue value = new SequenceValue(result, op, numToAllocate);
insertSequenceValue(value);
- return increment(value, op);
+ return increment(value, op, numToAllocate);
}
+
@SuppressWarnings("deprecation")
- public Increment newIncrement(long timestamp, Sequence.ValueOp action) {
+ public Increment newIncrement(long timestamp, Sequence.ValueOp action, long numToAllocate) {
Increment inc = new Increment(key.getKey());
// It doesn't matter what we set the amount too - we always use the values we get
// from the Get we do to prevent any race conditions. All columns that get added
// are returned with their current value
try {
inc.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, timestamp);
+ inc.setAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE, Bytes.toBytes(numToAllocate));
} catch (IOException e) {
throw new RuntimeException(e); // Impossible
}
@@ -413,7 +490,7 @@ public class Sequence {
return this.incrementBy == 0;
}
- public SequenceValue(Result r, ValueOp op) {
+ public SequenceValue(Result r, ValueOp op, long numToAllocate) {
KeyValue currentValueKV = getCurrentValueKV(r);
KeyValue incrementByKV = getIncrementByKV(r);
KeyValue cacheSizeKV = getCacheSizeKV(r);
@@ -429,8 +506,12 @@ public class Sequence {
this.cycle = (Boolean) PBoolean.INSTANCE.toObject(cycleKV.getValueArray(), cycleKV.getValueOffset(), cycleKV.getValueLength());
this.limitReached = false;
currentValue = nextValue;
+
if (op != ValueOp.VALIDATE_SEQUENCE) {
- currentValue -= incrementBy * cacheSize;
+ // We can't just take the max of numToAllocate and cacheSize
+ // We need to handle a valid edgecase where a client requests bulk allocation of
+ // a number of slots that are less than cache size of the sequence
+ currentValue -= incrementBy * (SequenceUtil.isBulkAllocation(numToAllocate) ? numToAllocate : cacheSize);
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java
new file mode 100644
index 0000000..afb4a20
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java
@@ -0,0 +1,48 @@
+package org.apache.phoenix.schema;
+
+/**
+ * A SequenceKey and the number of slots requested to be allocated for the sequence.
+ * It binds these two together to allow operations such as sorting
+ * a Collection of SequenceKeys and at the same time preserving the associated requested
+ * number of slots to allocate.
+ *
+ * This class delegates hashCode, equals and compareTo to @see{SequenceKey}.
+ *
+ */
+public class SequenceAllocation implements Comparable<SequenceAllocation> {
+
+ private final SequenceKey sequenceKey;
+ private final long numAllocations;
+
+ public SequenceAllocation(SequenceKey sequenceKey, long numAllocations) {
+ this.sequenceKey = sequenceKey;
+ this.numAllocations = numAllocations;
+ }
+
+
+ public SequenceKey getSequenceKey() {
+ return sequenceKey;
+ }
+
+
+ public long getNumAllocations() {
+ return numAllocations;
+ }
+
+
+ @Override
+ public int hashCode() {
+ return sequenceKey.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return sequenceKey.equals(obj);
+ }
+
+ @Override
+ public int compareTo(SequenceAllocation that) {
+ return sequenceKey.compareTo(that.sequenceKey);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
index f97d565..acf1864 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
@@ -16,6 +16,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.schema.SequenceInfo;
+import com.google.common.base.Preconditions;
import com.google.common.math.LongMath;
/**
@@ -23,17 +24,24 @@ import com.google.common.math.LongMath;
*/
public class SequenceUtil {
+ public static final long DEFAULT_NUM_SLOTS_TO_ALLOCATE = 1L;
+
/**
- * Returns the nextValue of a sequence
- * @throws SQLException if cycle is false and the sequence limit has been reached
+ * @return true if we limit of a sequence has been reached.
*/
public static boolean checkIfLimitReached(long currentValue, long minValue, long maxValue,
- long incrementBy, long cacheSize) throws SQLException {
+ long incrementBy, long cacheSize, long numToAllocate) {
long nextValue = 0;
boolean increasingSeq = incrementBy > 0 ? true : false;
// advance currentValue while checking for overflow
try {
- long incrementValue = LongMath.checkedMultiply(incrementBy, cacheSize);
+ long incrementValue;
+ if (isBulkAllocation(numToAllocate)) {
+ // For bulk allocation we increment independent of cache size
+ incrementValue = LongMath.checkedMultiply(incrementBy, numToAllocate);
+ } else {
+ incrementValue = LongMath.checkedMultiply(incrementBy, cacheSize);
+ }
nextValue = LongMath.checkedAdd(currentValue, incrementValue);
} catch (ArithmeticException e) {
return true;
@@ -46,9 +54,28 @@ public class SequenceUtil {
}
return false;
}
+
+ public static boolean checkIfLimitReached(long currentValue, long minValue, long maxValue,
+ long incrementBy, long cacheSize) throws SQLException {
+ return checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, DEFAULT_NUM_SLOTS_TO_ALLOCATE);
+ }
public static boolean checkIfLimitReached(SequenceInfo info) throws SQLException {
- return checkIfLimitReached(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy, info.cacheSize);
+ return checkIfLimitReached(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy, info.cacheSize, DEFAULT_NUM_SLOTS_TO_ALLOCATE);
+ }
+
+ /**
+ * Returns true if the value of numToAllocate signals that a bulk allocation of sequence slots
+ * was requested. Prevents proliferation of same comparison in many places throughout the code.
+ */
+ public static boolean isBulkAllocation(long numToAllocate) {
+ Preconditions.checkArgument(numToAllocate > 0);
+ return numToAllocate > DEFAULT_NUM_SLOTS_TO_ALLOCATE;
+ }
+
+ public static boolean isCycleAllowed(long numToAllocate) {
+ return !isBulkAllocation(numToAllocate);
+
}
/**
@@ -59,5 +86,15 @@ public class SequenceUtil {
return new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
+
+ /**
+ * Returns the correct instance of SQLExceptionCode when we detect a limit has been reached,
+ * depending upon whether a min or max value caused the limit to be exceeded.
+ */
+ public static SQLExceptionCode getLimitReachedErrorCode(boolean increasingSeq) {
+ SQLExceptionCode code = increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
+ : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
+ return code;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java
new file mode 100644
index 0000000..4a825f2
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java
@@ -0,0 +1,59 @@
+package org.apache.phoenix.schema;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SequenceAllocationTest {
+
+ @Test
+ /**
+ * Validates that sorting a List of SequenceAllocation instances
+ * results in the same sort order as sorting SequenceKey instances.
+ */
+ public void testSortingSequenceAllocation() {
+
+ // Arrange
+ SequenceKey sequenceKey1 = new SequenceKey(null, "seqalloc", "sequenceC",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ SequenceKey sequenceKey2 = new SequenceKey(null, "seqalloc", "sequenceB",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ SequenceKey sequenceKey3 = new SequenceKey(null, "seqalloc", "sequenceA",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ List<SequenceKey> sequenceKeys = Lists.newArrayList(sequenceKey1, sequenceKey2, sequenceKey3);
+ List<SequenceAllocation> sequenceAllocations = Lists.newArrayList(new SequenceAllocation(sequenceKey2, 1), new SequenceAllocation(sequenceKey1, 1), new SequenceAllocation(sequenceKey3, 1));
+
+ // Act
+ Collections.sort(sequenceKeys);
+ Collections.sort(sequenceAllocations);
+
+ // Assert
+ int i = 0;
+ for (SequenceKey sequenceKey : sequenceKeys) {
+ assertEquals(sequenceKey, sequenceAllocations.get(i).getSequenceKey());
+ i++;
+ }
+ }
+
+ @Test
+ public void testSortingSequenceAllocationPreservesAllocations() {
+
+ // Arrange
+ SequenceKey sequenceKeyC = new SequenceKey(null, "seqalloc", "sequenceC",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ SequenceKey sequenceKeyB = new SequenceKey(null, "seqalloc", "sequenceB",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ SequenceKey sequenceKeyA = new SequenceKey(null, "seqalloc", "sequenceA",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ List<SequenceAllocation> sequenceAllocations = Lists.newArrayList(new SequenceAllocation(sequenceKeyB, 15), new SequenceAllocation(sequenceKeyC, 11), new SequenceAllocation(sequenceKeyA, 1000));
+
+ // Act
+ Collections.sort(sequenceAllocations);
+
+ // Assert
+ assertEquals("sequenceA",sequenceAllocations.get(0).getSequenceKey().getSequenceName());
+ assertEquals(1000,sequenceAllocations.get(0).getNumAllocations());
+ assertEquals(15,sequenceAllocations.get(1).getNumAllocations());
+ assertEquals(11,sequenceAllocations.get(2).getNumAllocations());
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
index f25a213..2abc482 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
@@ -62,4 +62,58 @@ public class SequenceUtilTest {
public void testDescendingOverflowCycle() throws SQLException {
assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE));
}
+
+ @Test
+ public void testBulkAllocationAscendingNextValueGreaterThanMax() throws SQLException {
+ assertTrue(SequenceUtil.checkIfLimitReached(MAX_VALUE, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, 1));
+ }
+
+ @Test
+ public void testBulkAllocationAscendingNextValueReachLimit() throws SQLException {
+ assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, 2));
+ }
+
+ @Test
+ public void testBulkAllocationAscendingNextValueWithinLimit() throws SQLException {
+ assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, 2));
+
+ }
+
+ @Test
+ public void testBulkAllocationAscendingOverflow() throws SQLException {
+ assertTrue(SequenceUtil.checkIfLimitReached(Long.MAX_VALUE, 0, Long.MAX_VALUE, 1/* incrementBy */, CACHE_SIZE, 100));
+ }
+
+
+ @Test
+ public void testBulkAllocationDescendingNextValueLessThanMax() throws SQLException {
+ assertTrue(SequenceUtil.checkIfLimitReached(10, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, 5));
+ }
+
+ @Test
+ public void testBulkAllocationDescendingNextValueReachLimit() throws SQLException {
+ assertFalse(SequenceUtil.checkIfLimitReached(7, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, 3));
+ }
+
+ @Test
+ public void testBulkAllocationDescendingNextValueWithinLimit() throws SQLException {
+ assertFalse(SequenceUtil.checkIfLimitReached(8, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, 2));
+
+ }
+
+ @Test
+ public void testBulkAllocationDescendingOverflowCycle() throws SQLException {
+ assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE, 100));
+ }
+
+ @Test
+ public void testIsCycleAllowedForBulkAllocation() {
+ assertFalse(SequenceUtil.isCycleAllowed(2));
+ }
+
+ @Test
+ public void testIsCycleAllowedForStandardAllocation() {
+ assertTrue(SequenceUtil.isCycleAllowed(1));
+ }
+
}
[2/2] phoenix git commit: PHOENIX-1954 Reserve chunks of numbers for
a sequence (Jan Fernando)
Posted by ja...@apache.org.
PHOENIX-1954 Reserve chunks of numbers for a sequence (Jan Fernando)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3b1bfa0d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3b1bfa0d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3b1bfa0d
Branch: refs/heads/master
Commit: 3b1bfa0d7b83f0b9ee0ad535d6e1f99777c14cb6
Parents: 984e622
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Jul 9 20:49:03 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Jul 9 20:56:48 2015 -0700
----------------------------------------------------------------------
.../end2end/SequenceBulkAllocationIT.java | 1286 ++++++++++++++++++
phoenix-core/src/main/antlr3/PhoenixSQL.g | 5 +-
.../apache/phoenix/compile/SequenceManager.java | 70 +-
.../compile/SequenceValueExpression.java | 14 +-
.../coprocessor/SequenceRegionObserver.java | 64 +-
.../phoenix/exception/SQLExceptionCode.java | 2 +
.../apache/phoenix/jdbc/PhoenixStatement.java | 10 +-
.../apache/phoenix/parse/ParseNodeFactory.java | 6 +-
.../phoenix/parse/SequenceValueParseNode.java | 8 +-
.../phoenix/query/ConnectionQueryServices.java | 5 +-
.../query/ConnectionQueryServicesImpl.java | 23 +-
.../query/ConnectionlessQueryServicesImpl.java | 20 +-
.../query/DelegateConnectionQueryServices.java | 13 +-
.../apache/phoenix/schema/MetaDataClient.java | 2 +-
.../org/apache/phoenix/schema/Sequence.java | 107 +-
.../phoenix/schema/SequenceAllocation.java | 48 +
.../org/apache/phoenix/util/SequenceUtil.java | 47 +-
.../phoenix/schema/SequenceAllocationTest.java | 59 +
.../apache/phoenix/util/SequenceUtilTest.java | 54 +
19 files changed, 1763 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java
new file mode 100644
index 0000000..e7db1ec
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java
@@ -0,0 +1,1286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Suite of integration tests that validate that Bulk Allocation of Sequence values
+ * using the NEXT <n> VALUES FOR <seq> syntax works as expected and interacts
+ * correctly with NEXT VALUE FOR <seq> and CURRENT VALUE FOR <seq>.
+ *
+ * All tests are run with both a generic connection and a multi-tenant connection.
+ *
+ */
+@RunWith(Parameterized.class)
+public class SequenceBulkAllocationIT extends BaseClientManagedTimeIT {
+
+ private static final long BATCH_SIZE = 3;
+ private static final String SELECT_NEXT_VALUE_SQL =
+ "SELECT NEXT VALUE FOR %s FROM SYSTEM.\"SEQUENCE\" LIMIT 1";
+ private static final String SELECT_CURRENT_VALUE_SQL =
+ "SELECT CURRENT VALUE FOR %s FROM SYSTEM.\"SEQUENCE\" LIMIT 1";
+ private static final String CREATE_SEQUENCE_NO_MIN_MAX_TEMPLATE =
+ "CREATE SEQUENCE bulkalloc.alpha START WITH %s INCREMENT BY %s CACHE %s";
+ private static final String CREATE_SEQUENCE_WITH_MIN_MAX_TEMPLATE =
+ "CREATE SEQUENCE bulkalloc.alpha START WITH %s INCREMENT BY %s MINVALUE %s MAXVALUE %s CACHE %s";
+ private static final String CREATE_SEQUENCE_WITH_MIN_MAX_AND_CYCLE_TEMPLATE =
+ "CREATE SEQUENCE bulkalloc.alpha START WITH %s INCREMENT BY %s MINVALUE %s MAXVALUE %s CYCLE CACHE %s";
+
+
+ private Connection conn;
+ private String tenantId;
+
+ public SequenceBulkAllocationIT(String tenantId) {
+ this.tenantId = tenantId;
+ }
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String, String> props = getDefaultProps();
+ // Must update config before starting server
+ props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // close any open connection between tests, so that connections are not leaked
+ if (conn != null) {
+ conn.close();
+ }
+ }
+
+ @Parameters
+ public static Object[] data() {
+ return new Object[] {null, "tenant1"};
+ }
+
+
+ @Test
+ public void testSequenceParseNextValuesWithNull() throws Exception {
+ nextConnection();
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT NULL VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("null is not allowed to be used for <n> in NEXT <n> VALUES FOR <seq>");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+ @Test
+ public void testSequenceParseNextValuesWithNonNumber() throws Exception {
+ nextConnection();
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT '89b' VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("Only integers and longs are allowed to be used for <n> in NEXT <n> VALUES FOR <seq>");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+
+ @Test
+ public void testSequenceParseNextValuesWithNegativeNumber() throws Exception {
+ nextConnection();
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT '-1' VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("null is not allowed to be used for <n> in NEXT <n> VALUES FOR <seq>");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+ @Test
+ public void testParseNextValuesSequenceWithZeroAllocated() throws Exception {
+ nextConnection();
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT 0 VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("Only integers and longs are allowed to be used for <n> in NEXT <n> VALUES FOR <seq>");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+
+ @Test
+ public void testNextValuesForSequenceWithNoAllocatedValues() throws Exception {
+ // Create Sequence
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(1)
+ .numAllocated(100).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots
+ final int currentValueAfterAllocation = 100;
+ reserveSlotsInBulkAndAssertValue(1, props.numAllocated);
+ assertExpectedStateInSystemSequence(props, 101);
+ assertExpectedNumberOfValuesAllocated(1, currentValueAfterAllocation, props.incrementBy, props.numAllocated);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(101);
+ }
+
+ @Test
+ /**
+ * Validates we can invoke NEXT <n> VALUES FOR using bind vars.
+ */
+ public void testNextValuesForSequenceUsingBinds() throws Exception {
+
+ // Create Sequence
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(1)
+ .numAllocated(100).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ // Allocate 100 slots using SQL with Bind Params and a PreparedStatement
+ final int currentValueAfterAllocation = 100;
+ reserveSlotsInBulkUsingBindsAndAssertValue(1,props.numAllocated);
+ assertExpectedStateInSystemSequence(props, 101);
+ assertExpectedNumberOfValuesAllocated(1, currentValueAfterAllocation, props.incrementBy, props.numAllocated);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(101);
+ }
+
+
+ @Test
+ public void testNextValuesForSequenceWithPreviouslyAllocatedValues() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 1100;
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ int startValueAfterAllocation = 101;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(nextValueAfterAllocation);
+ }
+
+
+ @Test
+ /**
+ * Validates that if we close a connection after performing
+ * NEXT <n> VALUES FOR <seq> the values are correctly returned from
+ * the latest batch.
+ */
+ public void testConnectionCloseReturnsSequenceValuesCorrectly() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(2).startsWith(1).cacheSize(100)
+ .numAllocated(100).build();
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(3);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 399;
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ int startValueAfterAllocation = 201;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+
+ // Close the Connection
+ conn.close();
+
+ // Test that sequence, doesn't have gaps after closing the connection
+ nextConnection();
+ assertExpectedNextValueForSequence(nextValueAfterAllocation);
+ assertExpectedCurrentValueForSequence(nextValueAfterAllocation);
+
+ }
+
+ @Test
+ /**
+ * Validates that calling NEXT <n> VALUES FOR <seq> works correctly with UPSERT.
+ */
+ public void testNextValuesForSequenceWithUpsert() throws Exception {
+
+ // Create Sequence
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+
+ // Create TABLE
+ nextGenericConnection();
+ conn.createStatement().execute("CREATE TABLE bulkalloc.test ( id INTEGER NOT NULL PRIMARY KEY)");
+ nextConnection();
+
+ // Grab batch from Sequence
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+ assertExpectedStateInSystemSequence(props, 101);
+
+
+ // Perform UPSERT and validate Sequence was incremented as expected
+ conn.createStatement().execute("UPSERT INTO bulkalloc.test (id) VALUES (NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha)");
+ conn.commit();
+ assertExpectedStateInSystemSequence(props, 1101);
+
+ // SELECT values out and verify
+ nextConnection();
+ String query = "SELECT id, NEXT VALUE FOR bulkalloc.alpha FROM bulkalloc.test";
+ ResultSet rs = conn.prepareStatement(query).executeQuery();
+ assertTrue(rs.next());
+ assertEquals(101, rs.getInt(1)); // Threw out cache of 100, incremented by 1000
+ assertEquals(1101, rs.getInt(2));
+ assertFalse(rs.next());
+ }
+
+
+
+
+ @Test
+ public void testNextValuesForSequenceWithIncrementBy() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(3).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(4);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 3298;
+ int startValueAfterAllocation = 301;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(3298);
+ assertExpectedNextValueForSequence(3301);
+ }
+
+ @Test
+ public void testNextValuesForSequenceWithNegativeIncrementBy() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(-1).startsWith(2000).cacheSize(100)
+ .numAllocated(1000).build();
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(2000);
+ assertExpectedCurrentValueForSequence(2000);
+ assertExpectedNextValueForSequence(1999);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 901;
+ int startValueAfterAllocation = 1900;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(901);
+ assertExpectedNextValueForSequence(900);
+ }
+
+ @Test
+ public void testNextValuesForSequenceWithNegativeIncrementByGreaterThanOne() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(-5).startsWith(2000).cacheSize(100)
+ .numAllocated(100).build();
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ // Pull first batch from Sequence
+ assertExpectedNextValueForSequence(2000);
+ assertExpectedCurrentValueForSequence(2000);
+ assertExpectedNextValueForSequence(1995);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 1005;
+ int startValueAfterAllocation = 1500;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(1005);
+ assertExpectedNextValueForSequence(1000);
+ }
+
+
+ @Test
+ /**
+ * Validates that for NEXT <n> VALUES FOR if you try an allocate more slots such that that
+ * we exceed the max value of the sequence we throw an exception. Allocating sequence values in bulk
+ * should be an all or nothing operation - if the operation succeeds clients are guaranteed that they
+ * have access to all slots requested.
+ */
+ public void testNextValuesForSequenceExceedsMaxValue() throws Exception {
+ final SequenceProperties sequenceProps =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(100).cacheSize(100)
+ .numAllocated(1000).minValue(100).maxValue(900).build();
+
+ nextConnection();
+ createSequenceWithMinMax(sequenceProps);
+ nextConnection();
+
+ // Pull first batch from the sequence
+ assertExpectedNextValueForSequence(100);
+ assertExpectedCurrentValueForSequence(100);
+ assertExpectedNextValueForSequence(101);
+
+ // Attempt to bulk Allocate more slots than available
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + sequenceProps.numAllocated
+ + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("Invoking SELECT NEXT VALUES should have thrown Reached Max Value Exception");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+
+ // Assert sequence didn't advance
+ assertExpectedCurrentValueForSequence(101);
+ assertExpectedNextValueForSequence(102);
+ }
+
+ @Test
+ /**
+ * Validates that for NEXT <n> VALUES FOR if you try an allocate more slots such that that
+ * we exceed the min value of the sequence we throw an exception. Allocating sequence values in bulk
+ * should be an all or nothing operation - if the operation succeeds clients are guaranteed that they
+ * have access to all slots requested.
+ */
+ public void testNextValuesForSequenceExceedsMinValue() throws Exception {
+ final SequenceProperties sequenceProps =
+ new SequenceProperties.Builder().incrementBy(-5).startsWith(900).cacheSize(100)
+ .numAllocated(160).minValue(100).maxValue(900).build();
+
+ nextConnection();
+ createSequenceWithMinMax(sequenceProps);
+ nextConnection();
+
+ // Pull first batch from the sequence
+ assertExpectedNextValueForSequence(900);
+ assertExpectedCurrentValueForSequence(900);
+ assertExpectedNextValueForSequence(895);
+
+ // Attempt to bulk Allocate more slots than available
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + sequenceProps.numAllocated
+ + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("Invoking SELECT NEXT VALUES should have thrown Reached Max Value Exception");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+
+ // Assert sequence didn't advance (we still throw out the cached values)
+ assertExpectedCurrentValueForSequence(895);
+ assertExpectedNextValueForSequence(890);
+ }
+
+
+ @Test
+ /**
+ * Validates that if we don't exceed the limit bulk allocation works with sequences with a
+ * min and max defined.
+ */
+ public void testNextValuesForSequenceWithMinMaxDefined() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100)
+ .numAllocated(1000).minValue(100).maxValue(6000).build();
+
+ nextConnection();
+ createSequenceWithMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(100);
+ assertExpectedCurrentValueForSequence(100);
+ assertExpectedNextValueForSequence(105);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 5595;
+ int startValueAfterAllocation = 600;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(5595);
+ assertExpectedNextValueForSequence(5600);
+ }
+
+ @Test
+ public void testNextValuesForSequenceWithDefaultMax() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(100).cacheSize(100)
+ .numAllocated(Long.MAX_VALUE - 100).build();
+
+ // Create Sequence
+ nextConnection();
+ createSequenceWithMinMax(props);
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots
+ long currentValueAfterAllocation = 100;
+ long startValueAfterAllocation = Long.MAX_VALUE;
+ reserveSlotsInBulkAndAssertValue(currentValueAfterAllocation, props.numAllocated);
+ assertExpectedStateInSystemSequence(props, startValueAfterAllocation);
+
+ // Try and get next value
+ try {
+ conn.createStatement().executeQuery(String.format(SELECT_NEXT_VALUE_SQL, "bulkalloc.alpha"));
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+ @Test
+ /**
+ * Validates that if our current or start value is > 0 and we ask for Long.MAX
+ * and overflow to the next value, the correct Exception is thrown when
+ * the expression is evaluated.
+ */
+ public void testNextValuesForSequenceOverflowAllocation() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(100).cacheSize(100)
+ .numAllocated(Long.MAX_VALUE).build();
+
+ // Create Sequence
+ nextConnection();
+ createSequenceWithMinMax(props);
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + Long.MAX_VALUE
+ + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+
+ @Test
+ /**
+ * Validates that specifying an bulk allocation less than the size of the cache defined on the sequence works
+ * as expected.
+ */
+ public void testNextValuesForSequenceAllocationLessThanCacheSize() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100)
+ .numAllocated(50).minValue(100).maxValue(6000).build();
+
+ nextConnection();
+ createSequenceWithMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(100);
+ assertExpectedCurrentValueForSequence(100);
+ assertExpectedNextValueForSequence(105);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 355;
+ int startValueAfterAllocation = 110;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertExpectedStateInSystemSequence(props, 600);
+ assertExpectedNumberOfValuesAllocated(startValueAfterAllocation, currentValueAfterAllocation, props.incrementBy, props.numAllocated);
+
+ // Assert standard Sequence Operations return expected values
+ // 105 + (50 * 5) = 355
+ assertExpectedCurrentValueForSequence(355);
+ assertExpectedNextValueForSequence(360);
+ assertExpectedNextValueForSequence(365);
+ assertExpectedNextValueForSequence(370);
+ }
+
+ @Test
+ /**
+ * Validates that specifying an bulk allocation less than the size of the cache defined on the sequence works
+ * as expected if we don't have enough values in the cache to support the allocation.
+ */
+ public void testNextValuesForInsufficentCacheValuesAllocationLessThanCacheSize() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100)
+ .numAllocated(50).minValue(100).maxValue(6000).build();
+
+ nextConnection();
+ createSequenceWithMinMax(props);
+ nextConnection();
+
+ // Allocate 51 slots, only 49 will be left
+ int currentValueAfter51Allocations = 355; // 100 + 51 * 5
+ for (int i = 100; i <= currentValueAfter51Allocations; i = i + 5) {
+ assertExpectedNextValueForSequence(i);
+ }
+ assertExpectedCurrentValueForSequence(currentValueAfter51Allocations);
+
+ // Bulk Allocate 50 Sequence Slots which greater than amount left in cache
+ // This should throw away rest of the cache, and allocate the request slot
+ // from the next start value
+ int currentValueAfterAllocation = 845;
+ int startValueAfterAllocation = 600;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(845);
+ assertExpectedNextValueForSequence(850);
+ assertExpectedNextValueForSequence(855);
+ assertExpectedNextValueForSequence(860);
+ }
+
+ @Test
+ /**
+ * Validates that for NEXT <n> VALUES FOR is not supported on Sequences that have the
+ * CYCLE flag set to true.
+ */
+ public void testNextValuesForSequenceWithCycles() throws Exception {
+ final SequenceProperties sequenceProps =
+ new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100)
+ .numAllocated(1000).minValue(100).maxValue(900).build();
+
+ nextConnection();
+ createSequenceWithMinMaxAndCycle(sequenceProps);
+ nextConnection();
+
+ // Full first batch from the sequence
+ assertExpectedNextValueForSequence(100);
+ assertExpectedCurrentValueForSequence(100);
+ assertExpectedNextValueForSequence(105);
+
+ // Attempt to bulk Allocate more slots than available
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + sequenceProps.numAllocated
+ + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("Invoking SELECT NEXT VALUES should have failed as operation is not supported for sequences with Cycles.");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+
+ // Assert sequence didn't advance
+ assertExpectedCurrentValueForSequence(105);
+ assertExpectedNextValueForSequence(110);
+ assertExpectedNextValueForSequence(115);
+ }
+
+ @Test
+ /**
+ * Validates that if we have multiple NEXT <n> VALUES FOR <seq> expression and the
+ * CURRENT VALUE FOR expression work correctly when used in the same statement.
+ */
+ public void testCurrentValueForAndNextValuesForExpressionsForSameSequence() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 1100;
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ int startValueAfterAllocation = 101;
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT CURRENT VALUE FOR bulkalloc.alpha, NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+ int currentValueFor = rs.getInt(1);
+ int nextValuesFor = rs.getInt(2);
+ assertEquals("Expected the next value to be first value reserved", startValueAfterAllocation, nextValuesFor);
+ assertEquals("Expected current value to be the same as next value", startValueAfterAllocation, currentValueFor);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(nextValueAfterAllocation);
+ }
+
+ @Test
+ /**
+ * Validates that if we have multiple NEXT <n> VALUES FOR <seq> expressions for the *same* sequence
+ * in a statement we only process the one which has the highest value of <n> and return the start
+ * value for that for all expressions.
+ */
+ public void testMultipleNextValuesForExpressionsForSameSequence() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+
+ // Bulk Allocate Sequence Slots - One for 5 and one for 1000, 1000 should have precedence
+ int currentValueAfterAllocation = 1100;
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ int startValueAfterAllocation = 101;
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT 5 VALUES FOR bulkalloc.alpha, NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ int firstValue = rs.getInt(1);
+ int secondValue = rs.getInt(2);
+ assertEquals("Expected both expressions to return the same value", firstValue, secondValue);
+ assertEquals("Expected the value returned to be the highest allocation", startValueAfterAllocation, firstValue);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(nextValueAfterAllocation);
+ }
+
+ @Test
+ /**
+ * Validates that if we have NEXT VALUE FOR <seq> and NEXT <n> VALUES FOR <seq> expressions for the *same* sequence
+ * in a statement we only process way and honor the value of the highest value of <n>, where for
+ * NEXT VALUE FOR <seq> is assumed to be 1.
+ */
+ public void testMultipleDifferentExpressionsForSameSequence() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ // Pull First Batch from Sequence
+ assertExpectedNextValueForSequence(1);
+
+ // Bulk Allocate Sequence Slots and Get Next Value in Same Statement
+ int currentValueAfterAllocation = 1100;
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ int startValueAfterAllocation = 101;
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT VALUE FOR bulkalloc.alpha, "
+ + "NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha, "
+ + "CURRENT VALUE FOR bulkalloc.alpha, "
+ + "NEXT 999 VALUES FOR bulkalloc.alpha "
+ + "FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert all values returned are the same
+ // Expect them to be the highest value from NEXT VALUE or NEXT <n> VALUES FOR
+ int previousVal = 0;
+ for (int i = 1; i <= 4; i++) {
+ int currentVal = rs.getInt(i);
+ if (i != 1) {
+ assertEquals(
+ "Expected all NEXT VALUE FOR and NEXT <n> VALUES FOR expressions to return the same value",
+ previousVal, currentVal);
+ }
+ previousVal = currentVal;
+ }
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(nextValueAfterAllocation);
+ }
+
+
+ @Test
+ /**
+ * Validates that using NEXT <n> VALUES FOR on different sequences in the
+ * same statement with *different* values of <n> works as expected. This
+ * test validates that we keep our numAllocated array and sequence keys in
+ * sync during the sequence management process.
+ */
+ public void testMultipleNextValuesForExpressionsForDifferentSequences() throws Exception {
+
+ nextConnection();
+ conn.createStatement().execute("CREATE SEQUENCE bulkalloc.alpha START WITH 30 INCREMENT BY 3 CACHE 100");
+ conn.createStatement().execute("CREATE SEQUENCE bulkalloc.beta START WITH 100 INCREMENT BY 5 CACHE 50");
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots for Two Sequences
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT 100 VALUES FOR bulkalloc.alpha, NEXT 1000 VALUES FOR bulkalloc.beta FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ assertEquals(30, rs.getInt(1));
+ assertEquals(100, rs.getInt(2));
+
+ // Assert standard Sequence Operations return expected values
+ for (int i = 330; i < 330 + (2 * 100); i += 3) {
+ assertExpectedCurrentValueForSequence(i - 3, "bulkalloc.alpha");
+ assertExpectedNextValueForSequence(i, "bulkalloc.alpha");
+ }
+
+ for (int i = 5100; i < 5100 + (2 * 1000); i += 5) {
+ assertExpectedCurrentValueForSequence(i - 5, "bulkalloc.beta");
+ assertExpectedNextValueForSequence(i, "bulkalloc.beta");
+ }
+ }
+
+ @Test
+ /**
+ * Validates that calling NEXT <n> VALUES FOR with EXPLAIN PLAN doesn't use
+ * allocate any slots.
+ */
+ public void testExplainPlanValidatesSequences() throws Exception {
+
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(3).startsWith(30).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+
+ nextGenericConnection();
+ conn.createStatement().execute("CREATE TABLE bulkalloc.simpletbl (k BIGINT NOT NULL PRIMARY KEY)");
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots
+ int startValueAfterAllocation = 30;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+
+ // Execute EXPLAIN PLAN multiple times, which should not change Sequence values
+ for (int i = 0; i < 3; i++) {
+ conn.createStatement().executeQuery("EXPLAIN SELECT NEXT 1000 VALUES FOR bulkalloc.alpha FROM bulkalloc.simpletbl");
+ }
+
+ // Validate the current value was not advanced and was the starting value
+ assertExpectedStateInSystemSequence(props, 3030);
+
+ // Assert standard Sequence Operations return expected values
+ int startValue = 3030;
+ for (int i = startValue; i < startValue + (2 * props.cacheSize); i += props.incrementBy) {
+ assertExpectedCurrentValueForSequence(i - props.incrementBy);
+ assertExpectedNextValueForSequence(i);
+ }
+ }
+
+ @Test
+ public void testExplainPlanForNextValuesFor() throws Exception {
+
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(3).startsWith(30).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextGenericConnection();
+ conn.createStatement().execute("CREATE TABLE bulkalloc.simpletbl (k BIGINT NOT NULL PRIMARY KEY)");
+ nextConnection();
+
+ // Execute EXPLAIN PLAN which should not change Sequence values
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT NEXT 1000 VALUES FOR bulkalloc.alpha FROM bulkalloc.simpletbl");
+
+ // Assert output for Explain Plain result is as expected
+ assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER BULKALLOC.SIMPLETBL\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ "CLIENT RESERVE VALUES FROM 1 SEQUENCE", QueryUtil.getExplainPlan(rs));
+ }
+
+
+ /**
+ * Performs a multithreaded test whereby we interleave reads from the result set of
+ * NEXT VALUE FOR and NEXT <n> VALUES FOR to make sure we get expected values with the
+ * following order of execution:
+ *
+ * 1) Execute expression NEXT <n> VALUES FOR <seq>
+ * 2) Execute expression NEXT VALUE FOR <seq>
+ * 3) Read back value from expression NEXT VALUE FOR <seq> via rs.next()
+ * 4) Read back value from expression NEXT <n> VALUES FOR <seq> via rs.next()
+ */
+ public void testNextValuesForMixedWithNextValueForMultiThreaded() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+
+ // Bulk Allocate Sequence Slots
+ final long startValueAfterAllocation1 = 101;
+ final long startValueAfterAllocation2 = 1101;
+ final long numSlotToAllocate = props.numAllocated;
+
+ // Setup and run tasks in independent Threads
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ Callable<Long> task1 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ latch1.countDown(); // Allows NEXT VALUE FOR thread to proceed
+ latch2.await(); // Waits until NEXT VALUE FOR thread reads and increments currentValue
+ rs.next();
+ return rs.getLong(1);
+ }
+
+ };
+
+ Callable<Long> task2 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ latch1.await(); // Wait for execution of NEXT <n> VALUES FOR expression
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT VALUE FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ rs.next();
+ long retVal = rs.getLong(1);
+ latch2.countDown(); // Allow NEXT <n> VALUES for thread to completed
+ return retVal;
+ }
+
+ };
+
+ @SuppressWarnings("unchecked")
+ List<Future<Long>> futures = executorService.invokeAll(Lists.newArrayList(task1, task2), 20, TimeUnit.SECONDS);
+ assertEquals(startValueAfterAllocation1, futures.get(0).get(10, TimeUnit.SECONDS).longValue());
+ assertEquals(startValueAfterAllocation2, futures.get(1).get(10, TimeUnit.SECONDS).longValue());
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ @Test
+ public void testMultipleNextValuesWithDiffAllocsForMultiThreaded() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+
+ // Bulk Allocate Sequence Slots
+ final long startValueAfterAllocation1 = 101;
+ final long startValueAfterAllocation2 = 1101;
+ final long numSlotToAllocate1 = 1000;
+ final long numSlotToAllocate2 = 100;
+
+ // Setup and run tasks in independent Threads
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ Callable<Long> task1 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate1 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ rs.next();
+ latch1.countDown(); // Allows other thread to proceed
+ latch2.await();
+ return rs.getLong(1);
+ }
+
+ };
+
+ Callable<Long> task2 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ latch1.await(); // Wait for other thread to execut of NEXT <n> VALUES FOR expression
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate2 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ rs.next();
+ long retVal = rs.getLong(1);
+ latch2.countDown(); // Allow thread to completed
+ return retVal;
+ }
+
+ };
+
+ @SuppressWarnings("unchecked")
+ List<Future<Long>> futures = executorService.invokeAll(Lists.newArrayList(task1, task2), 5, TimeUnit.SECONDS);
+
+ // Retrieve value from Thread running NEXT <n> VALUES FOR
+ Long retValue1 = futures.get(0).get(5, TimeUnit.SECONDS);
+ assertEquals(startValueAfterAllocation1, retValue1.longValue());
+
+ // Retrieve value from Thread running NEXT VALUE FOR
+ Long retValue2 = futures.get(1).get(5, TimeUnit.SECONDS);
+ assertEquals(startValueAfterAllocation2, retValue2.longValue());
+
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ @Test
+ public void testMultipleNextValuesWithSameAllocsForMultiThreaded() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots
+ final long startValueAfterAllocation1 = 1;
+ final long startValueAfterAllocation2 = 1001;
+ final long numSlotToAllocate1 = 1000;
+ final long numSlotToAllocate2 = 1000;
+
+ // Setup and run tasks in independent Threads
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ Callable<Long> task1 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate1 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ latch1.countDown(); // Allows other thread to proceed
+ latch2.await();
+ rs.next();
+ return rs.getLong(1);
+ }
+
+ };
+
+ Callable<Long> task2 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ latch1.await(); // Wait for other thread to execut of NEXT <n> VALUES FOR expression
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate2 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ rs.next();
+ long retVal = rs.getLong(1);
+ latch2.countDown(); // Allow thread to completed
+ return retVal;
+ }
+
+ };
+
+ // Because of the way the threads are interleaved the ranges used by each thread will the reserve
+ // of the order to statement execution
+ @SuppressWarnings("unchecked")
+ List<Future<Long>> futures = executorService.invokeAll(Lists.newArrayList(task1, task2), 5, TimeUnit.SECONDS);
+ assertEquals(startValueAfterAllocation2, futures.get(0).get(5, TimeUnit.SECONDS).longValue());
+ assertEquals(startValueAfterAllocation1, futures.get(1).get(5, TimeUnit.SECONDS).longValue());
+
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ // -----------------------------------------------------------------
+ // Private Helper Methods
+ // -----------------------------------------------------------------
+ private void assertBulkAllocationSucceeded(SequenceProperties props,
+ int currentValueAfterAllocation, int startValueAfterAllocation) throws SQLException {
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ assertExpectedStateInSystemSequence(props, nextValueAfterAllocation);
+ assertExpectedNumberOfValuesAllocated(startValueAfterAllocation, currentValueAfterAllocation, props.incrementBy, props.numAllocated);
+ }
+
+
+ private void createSequenceWithNoMinMax(final SequenceProperties props) throws SQLException {
+ conn.createStatement().execute(
+ String.format(CREATE_SEQUENCE_NO_MIN_MAX_TEMPLATE, props.startsWith,
+ props.incrementBy, props.cacheSize));
+ }
+
+ private void createSequenceWithMinMax(final SequenceProperties props) throws SQLException {
+ conn.createStatement().execute(
+ String.format(CREATE_SEQUENCE_WITH_MIN_MAX_TEMPLATE, props.startsWith,
+ props.incrementBy, props.minValue, props.maxValue, props.cacheSize));
+ }
+
+ private void createSequenceWithMinMaxAndCycle(final SequenceProperties props) throws SQLException {
+ conn.createStatement().execute(
+ String.format(CREATE_SEQUENCE_WITH_MIN_MAX_AND_CYCLE_TEMPLATE, props.startsWith,
+ props.incrementBy, props.minValue, props.maxValue, props.cacheSize));
+ }
+
+ private void reserveSlotsInBulkAndAssertValue(long expectedValue, long numSlotToAllocate)
+ throws SQLException {
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ assertEquals(expectedValue, rs.getInt(1));
+ }
+
+ private void reserveSlotsInBulkUsingBindsAndAssertValue(int expectedValue, long numSlotToAllocate)
+ throws SQLException {
+ PreparedStatement ps = conn.prepareStatement("SELECT NEXT ? VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ ps.setLong(1, numSlotToAllocate);
+ ResultSet rs = ps.executeQuery();
+ assertTrue(rs.next());
+ int retValue = rs.getInt(1);
+ assertEquals(expectedValue, retValue);
+ }
+
+ private void assertExpectedCurrentValueForSequence(int expectedValue) throws SQLException {
+ assertExpectedCurrentValueForSequence(expectedValue, "bulkalloc.alpha");
+ }
+
+ private void assertExpectedCurrentValueForSequence(int expectedValue, String sequenceName) throws SQLException {
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery(String.format(SELECT_CURRENT_VALUE_SQL, sequenceName));
+ assertTrue(rs.next());
+ assertEquals(expectedValue, rs.getInt(1));
+ }
+
+ private void assertExpectedNextValueForSequence(int expectedValue) throws SQLException {
+ assertExpectedNextValueForSequence(expectedValue, "bulkalloc.alpha");
+ }
+
+ private void assertExpectedNextValueForSequence(int expectedValue, String sequenceName) throws SQLException {
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery(String.format(SELECT_NEXT_VALUE_SQL, sequenceName));
+ assertTrue(rs.next());
+ assertEquals(expectedValue, rs.getInt(1));
+ }
+
+
+ /**
+ * Returns a non-tenant specific connection.
+ */
+ private void nextGenericConnection() throws Exception {
+ if (conn != null) conn.close();
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ conn = DriverManager.getConnection(getUrl(), props);
+ }
+
+ private void nextConnection() throws Exception {
+ if (conn != null) conn.close();
+ long ts = nextTimestamp();
+ if (tenantId != null) {
+ // Create tenant specific connection
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(nextTimestamp()));
+ this.conn = DriverManager.getConnection(getUrl() + ';' + TENANT_ID_ATTRIB + '=' + "tenant1", props);
+
+ } else {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ conn = DriverManager.getConnection(getUrl(), props);
+ }
+ }
+
+ private void assertExpectedStateInSystemSequence(SequenceProperties props, long currentValue)
+ throws SQLException {
+ // Validate state in System.Sequence
+ ResultSet rs =
+ conn.createStatement()
+ .executeQuery(
+ "SELECT start_with, current_value, increment_by, cache_size, min_value, max_value, cycle_flag, sequence_schema, sequence_name FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ assertEquals(props.startsWith, rs.getLong("start_with"));
+ assertEquals(props.incrementBy, rs.getLong("increment_by"));
+ assertEquals(props.cacheSize, rs.getLong("cache_size"));
+ assertEquals(false, rs.getBoolean("cycle_flag"));
+ assertEquals("BULKALLOC", rs.getString("sequence_schema"));
+ assertEquals("ALPHA", rs.getString("sequence_name"));
+ assertEquals(currentValue, rs.getLong("current_value"));
+ assertEquals(props.minValue, rs.getLong("min_value"));
+ assertEquals(props.maxValue, rs.getLong("max_value"));
+ assertFalse(rs.next());
+ }
+
+ private void assertExpectedNumberOfValuesAllocated(long firstValue, long lastValue,
+ int incrementBy, long numAllocated) {
+ int cnt = 0;
+ for (long i = firstValue; (incrementBy > 0 ? i <= lastValue : i >= lastValue); i += incrementBy) {
+ cnt++;
+ }
+ assertEquals("Incorrect number of values allocated: " + cnt, numAllocated, cnt);
+ }
+
+ private static class SequenceProperties {
+
+ private final long numAllocated;
+ private final int incrementBy;
+ private final int startsWith;
+ private final int cacheSize;
+ private final long minValue;
+ private final long maxValue;
+
+ public SequenceProperties(Builder builder) {
+ this.numAllocated = builder.numAllocated;
+ this.incrementBy = builder.incrementBy;
+ this.startsWith = builder.startsWith;
+ this.cacheSize = builder.cacheSize;
+ this.minValue = builder.minValue;
+ this.maxValue = builder.maxValue;
+ }
+
+ private static class Builder {
+
+ long maxValue = Long.MAX_VALUE;
+ long minValue = Long.MIN_VALUE;
+ long numAllocated = 100;
+ int incrementBy = 1;
+ int startsWith = 1;
+ int cacheSize = 100;
+
+ public Builder numAllocated(long numAllocated) {
+ this.numAllocated = numAllocated;
+ return this;
+ }
+
+ public Builder startsWith(int startsWith) {
+ this.startsWith = startsWith;
+ return this;
+ }
+
+ public Builder cacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ return this;
+ }
+
+ public Builder incrementBy(int incrementBy) {
+ this.incrementBy = incrementBy;
+ return this;
+ }
+
+ public Builder minValue(long minValue) {
+ this.minValue = minValue;
+ return this;
+ }
+
+ public Builder maxValue(long maxValue) {
+ this.maxValue = maxValue;
+ return this;
+ }
+
+ public SequenceProperties build() {
+ return new SequenceProperties(this);
+ }
+
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index ca5e778..10fda68 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -903,7 +903,10 @@ term returns [ParseNode ret]
}
| (n=NEXT | CURRENT) VALUE FOR s=from_table_name
{ contextStack.peek().hasSequences(true);
- $ret = n==null ? factory.currentValueFor(s) : factory.nextValueFor(s); }
+ $ret = n==null ? factory.currentValueFor(s) : factory.nextValueFor(s, null); }
+ | (n=NEXT) lorb=literal_or_bind VALUES FOR s=from_table_name
+ { contextStack.peek().hasSequences(true);
+ $ret = factory.nextValueFor(s, lorb); }
;
one_or_more_expressions returns [List<ParseNode> ret]
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
index ede6d9c..5ec8cd2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -24,16 +24,23 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SequenceValueParseNode;
import org.apache.phoenix.parse.SequenceValueParseNode.Op;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.tuple.DelegateTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.SequenceUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -41,7 +48,7 @@ import com.google.common.collect.Maps;
public class SequenceManager {
private final PhoenixStatement statement;
private int[] sequencePosition;
- private List<SequenceKey> nextSequences;
+ private List<SequenceAllocation> nextSequences;
private List<SequenceKey> currentSequences;
private final Map<SequenceKey,SequenceValueExpression> sequenceMap = Maps.newHashMap();
private final BitSet isNextSequence = new BitSet();
@@ -113,20 +120,30 @@ public class SequenceManager {
}
}
- public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) {
+ public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) throws SQLException {
PName tenantName = statement.getConnection().getTenantId();
String tenantId = tenantName == null ? null : tenantName.getString();
TableName tableName = node.getTableName();
int nSaltBuckets = statement.getConnection().getQueryServices().getSequenceSaltBuckets();
+ ParseNode numToAllocateNode = node.getNumToAllocateNode();
+
+ long numToAllocate = determineNumToAllocate(tableName, numToAllocateNode);
SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(), tableName.getTableName(), nSaltBuckets);
SequenceValueExpression expression = sequenceMap.get(key);
if (expression == null) {
int index = sequenceMap.size();
- expression = new SequenceValueExpression(key, node.getOp(), index);
+ expression = new SequenceValueExpression(key, node.getOp(), index, numToAllocate);
sequenceMap.put(key, expression);
- } else if (expression.op != node.getOp()){
- expression = new SequenceValueExpression(key, node.getOp(), expression.getIndex());
- }
+ } else if (expression.op != node.getOp() || expression.getNumToAllocate() < numToAllocate) {
+ // Keep the maximum allocation size we see in a statement
+ SequenceValueExpression oldExpression = expression;
+ expression = new SequenceValueExpression(key, node.getOp(), expression.getIndex(), Math.max(expression.getNumToAllocate(), numToAllocate));
+ if (oldExpression.getNumToAllocate() < numToAllocate) {
+ // If we found a NEXT VALUE expression with a higher number to allocate
+ // We override the original expression
+ sequenceMap.put(key, expression);
+ }
+ }
// If we see a NEXT and a CURRENT, treat the CURRENT just like a NEXT
if (node.getOp() == Op.NEXT_VALUE) {
isNextSequence.set(expression.getIndex());
@@ -134,6 +151,38 @@ public class SequenceManager {
return expression;
}
+
+ /**
+ * If caller specified used NEXT <n> VALUES FOR <seq> expression then we have set the numToAllocate.
+ * If numToAllocate is > 1 we treat this as a bulk reservation of a block of sequence slots.
+ *
+ * @throws a SQLException if we can't compile the expression
+ */
+ private long determineNumToAllocate(TableName sequenceName, ParseNode numToAllocateNode)
+ throws SQLException {
+
+ if (numToAllocateNode != null) {
+ final StatementContext context = new StatementContext(statement);
+ ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+ Expression expression = numToAllocateNode.accept(expressionCompiler);
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ expression.evaluate(null, ptr);
+ if (ptr.getLength() == 0 || !expression.getDataType().isCoercibleTo(PLong.INSTANCE)) {
+ throw SequenceUtil.getException(sequenceName.getSchemaName(), sequenceName.getTableName(), SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT);
+ }
+
+ // Parse <n> and make sure it is greater than 0. We don't support allocating 0 or negative values!
+ long numToAllocate = (long) PLong.INSTANCE.toObject(ptr, expression.getDataType());
+ if (numToAllocate < 1) {
+ throw SequenceUtil.getException(sequenceName.getSchemaName(), sequenceName.getTableName(), SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT);
+ }
+ return numToAllocate;
+
+ } else {
+ // Standard Sequence Allocation Behavior
+ return SequenceUtil.DEFAULT_NUM_SLOTS_TO_ALLOCATE;
+ }
+ }
public void validateSequences(Sequence.ValueOp action) throws SQLException {
if (sequenceMap.isEmpty()) {
@@ -146,14 +195,17 @@ public class SequenceManager {
currentSequences = Lists.newArrayListWithExpectedSize(maxSize);
for (Map.Entry<SequenceKey, SequenceValueExpression> entry : sequenceMap.entrySet()) {
if (isNextSequence.get(entry.getValue().getIndex())) {
- nextSequences.add(entry.getKey());
+ nextSequences.add(new SequenceAllocation(entry.getKey(), entry.getValue().getNumToAllocate()));
} else {
currentSequences.add(entry.getKey());
}
}
long[] srcSequenceValues = new long[nextSequences.size()];
SQLException[] sqlExceptions = new SQLException[nextSequences.size()];
+
+ // Sort the next sequences to prevent deadlocks
Collections.sort(nextSequences);
+
// Create reverse indexes
for (int i = 0; i < nextSequences.size(); i++) {
sequencePosition[i] = sequenceMap.get(nextSequences.get(i)).getIndex();
@@ -168,4 +220,6 @@ public class SequenceManager {
services.validateSequences(nextSequences, timestamp, srcSequenceValues, sqlExceptions, action);
setSequenceValues(srcSequenceValues, dstSequenceValues, sqlExceptions);
}
-}
+
+}
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java
index cdaae68..71e2d02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java
@@ -32,13 +32,23 @@ public class SequenceValueExpression extends BaseTerminalExpression {
private final SequenceKey key;
final Op op;
private final int index;
+ private final long numToAllocate;
- public SequenceValueExpression(SequenceKey key, Op op, int index) {
+ public SequenceValueExpression(SequenceKey key, Op op, int index, long numToAllocate) {
this.key = key;
this.op = op;
this.index = index;
+ this.numToAllocate = numToAllocate;
}
+ public long getNumToAllocate() {
+ return numToAllocate;
+ }
+
+ public SequenceKey getKey() {
+ return key;
+ }
+
public int getIndex() {
return index;
}
@@ -73,7 +83,7 @@ public class SequenceValueExpression extends BaseTerminalExpression {
@Override
public String toString() {
- return op.getName() + " VALUE FOR " + SchemaUtil.getTableName(key.getSchemaName(),key.getSequenceName());
+ return op.getName() + (numToAllocate == 1 ? " VALUE " : (" " + numToAllocate + " VALUES " )) + "FOR " + SchemaUtil.getTableName(key.getSchemaName(),key.getSequenceName());
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 9b5f040..bb6c4b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -41,17 +41,17 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PBoolean;
-import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.schema.Sequence;
-import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SequenceUtil;
@@ -77,6 +77,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
public static final String OPERATION_ATTRIB = "SEQUENCE_OPERATION";
public static final String MAX_TIMERANGE_ATTRIB = "MAX_TIMERANGE";
public static final String CURRENT_VALUE_ATTRIB = "CURRENT_VALUE";
+ public static final String NUM_TO_ALLOCATE = "NUM_TO_ALLOCATE";
private static final byte[] SUCCESS_VALUE = PInteger.INSTANCE.toBytes(Integer.valueOf(Sequence.SUCCESS));
private static Result getErrorResult(byte[] row, long timestamp, int errorCode) {
@@ -138,10 +139,8 @@ public class SequenceRegionObserver extends BaseRegionObserver {
if (result.isEmpty()) {
return getErrorResult(row, maxTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
}
- if (validateOnly) {
- return result;
- }
+
KeyValue currentValueKV = Sequence.getCurrentValueKV(result);
KeyValue incrementByKV = Sequence.getIncrementByKV(result);
KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result);
@@ -224,6 +223,28 @@ public class SequenceRegionObserver extends BaseRegionObserver {
cycleKV.getValueOffset(), cycleKV.getValueLength());
}
+ long numSlotsToAllocate = calculateNumSlotsToAllocate(increment);
+
+ // We don't support Bulk Allocations on sequences that have the CYCLE flag set to true
+ if (cycle && !SequenceUtil.isCycleAllowed(numSlotsToAllocate)) {
+ return getErrorResult(row, maxTimestamp, SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED.getErrorCode());
+ }
+
+ // Bulk Allocations are expressed by NEXT <n> VALUES FOR
+ if (SequenceUtil.isBulkAllocation(numSlotsToAllocate)) {
+ if (SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, numSlotsToAllocate)) {
+ // If we try to allocate more slots than the limit we return an error.
+ // Allocating sequence values in bulk should be an all or nothing operation.
+ // If the operation succeeds clients are guaranteed that they have reserved
+ // all the slots requested.
+ return getErrorResult(row, maxTimestamp, SequenceUtil.getLimitReachedErrorCode(increasingSeq).getErrorCode());
+ }
+ }
+
+ if (validateOnly) {
+ return result;
+ }
+
// return if we have run out of sequence values
if (limitReached) {
if (cycle) {
@@ -231,16 +252,15 @@ public class SequenceRegionObserver extends BaseRegionObserver {
currentValue = increasingSeq ? minValue : maxValue;
}
else {
- SQLExceptionCode code = increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
- : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
- return getErrorResult(row, maxTimestamp, code.getErrorCode());
+ return getErrorResult(row, maxTimestamp, SequenceUtil.getLimitReachedErrorCode(increasingSeq).getErrorCode());
}
}
-
+
// check if the limit was reached
- limitReached = SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize);
- // update currentValue
- currentValue += incrementBy * cacheSize;
+ limitReached = SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, numSlotsToAllocate);
+
+ // update currentValue
+ currentValue += incrementBy * (SequenceUtil.isBulkAllocation(numSlotsToAllocate) ? numSlotsToAllocate : cacheSize);
// update the currentValue of the Result row
KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp);
Sequence.replaceCurrentValueKV(cells, newCurrentValueKV);
@@ -264,6 +284,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
region.closeRegionOperation();
}
}
+
/**
* Creates a new KeyValue for a long value
@@ -417,5 +438,20 @@ public class SequenceRegionObserver extends BaseRegionObserver {
region.closeRegionOperation();
}
}
+
+ /**
+ * Determines whether a request for incrementing the sequence was a bulk allocation and if so
+ * what the number of slots to allocate is. This is triggered by the NEXT <n> VALUES FOR expression.
+ * For backwards compatibility with older clients, we default the value to 1 which preserves
+ * existing behavior when invoking NEXT VALUE FOR.
+ */
+ private long calculateNumSlotsToAllocate(final Increment increment) {
+ long numToAllocate = 1;
+ byte[] numToAllocateBytes = increment.getAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE);
+ if (numToAllocateBytes != null) {
+ numToAllocate = Bytes.toLong(numToAllocateBytes);
+ }
+ return numToAllocate;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index cc8b02a..b9f81fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -276,6 +276,8 @@ public enum SQLExceptionCode {
SEQUENCE_VAL_REACHED_MAX_VALUE(1212, "42Z12", "Reached MAXVALUE of sequence"),
SEQUENCE_VAL_REACHED_MIN_VALUE(1213, "42Z13", "Reached MINVALUE of sequence"),
INCREMENT_BY_MUST_NOT_BE_ZERO(1214, "42Z14", "Sequence INCREMENT BY value cannot be zero"),
+ NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT(1215, "42Z15", "Sequence NEXT n VALUES FOR must be a postive integer or constant." ),
+ NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED(1216, "42Z16", "Sequence NEXT n VALUES FOR is not supported for Sequences with the CYCLE flag" ),
/** Parser error. (errorcode 06, sqlState 42P) */
PARSER_ERROR(601, "42P00", "Syntax error.", Factory.SYTAX_ERROR),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index f323ec4..9689589 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -232,7 +232,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
}
protected QueryPlan optimizeQuery(CompilableStatement stmt) throws SQLException {
- QueryPlan plan = stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ QueryPlan plan = stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE);
return connection.getQueryServices().getOptimizer().optimize(this, plan);
}
@@ -245,7 +245,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
public PhoenixResultSet call() throws SQLException {
final long startTime = System.currentTimeMillis();
try {
- QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
plan = connection.getQueryServices().getOptimizer().optimize(
PhoenixStatement.this, plan);
// this will create its own trace internally, so we don't wrap this
@@ -303,7 +303,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
// since they'd update data directly from coprocessors, and should thus operate on
// the latest state
try {
- MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
MutationState state = plan.execute();
connection.getMutationState().join(state);
if (connection.getAutoCommit()) {
@@ -1203,14 +1203,14 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
if (stmt.getOperation().isMutation()) {
throw new ExecuteQueryNotApplicableException(query);
}
- return stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ return stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE);
}
public MutationPlan compileMutation(CompilableStatement stmt, String query) throws SQLException {
if (!stmt.getOperation().isMutation()) {
throw new ExecuteUpdateNotApplicableException(query);
}
- return stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ return stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE);
}
public MutationPlan compileMutation(String sql) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 44359a7..49b14c6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -310,11 +310,11 @@ public class ParseNodeFactory {
}
public SequenceValueParseNode currentValueFor(TableName tableName) {
- return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.CURRENT_VALUE);
+ return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.CURRENT_VALUE, null);
}
- public SequenceValueParseNode nextValueFor(TableName tableName) {
- return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.NEXT_VALUE);
+ public SequenceValueParseNode nextValueFor(TableName tableName, ParseNode numToAllocateNode) {
+ return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.NEXT_VALUE, numToAllocateNode);
}
public AddColumnStatement addColumn(NamedTableNode table, PTableType tableType, List<ColumnDef> columnDefs, boolean ifNotExists, ListMultimap<String,Pair<String,Object>> props) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
index a5d60fe..1fc670c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
@@ -39,10 +39,16 @@ public class SequenceValueParseNode extends TerminalParseNode {
}
private final TableName tableName;
private final Op op;
+ private final ParseNode numToAllocate;
- public SequenceValueParseNode(TableName tableName, Op op) {
+ public SequenceValueParseNode(TableName tableName, Op op, ParseNode numToAllocate) {
this.tableName = tableName;
this.op = op;
+ this.numToAllocate = numToAllocate;
+ }
+
+ public ParseNode getNumToAllocateNode() {
+ return numToAllocate;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index dc51b10..3b53309 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.stats.PTableStats;
@@ -91,8 +92,8 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
long createSequence(String tenantId, String schemaName, String sequenceName, long startWith, long incrementBy, long cacheSize, long minValue, long maxValue, boolean cycle, long timestamp) throws SQLException;
long dropSequence(String tenantId, String schemaName, String sequenceName, long timestamp) throws SQLException;
- void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException;
- void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException;
+ void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException;
+ void incrementSequences(List<SequenceAllocation> sequenceAllocation, long timestamp, long[] values, SQLException[] exceptions) throws SQLException;
long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException;
void returnSequences(List<SequenceKey> sequenceKeys, long timestamp, SQLException[] exceptions) throws SQLException;