You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/24 09:46:05 UTC
[2/6] camel git commit: Add {at,
after}-sequence-number symantics to the shard list container
Add {at,after}-sequence-number symantics to the shard list container
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e2b9d91c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e2b9d91c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e2b9d91c
Branch: refs/heads/master
Commit: e2b9d91c3428e659c36799466de7abe709c4d941
Parents: 0350423
Author: Candle <ca...@candle.me.uk>
Authored: Wed Dec 16 14:58:02 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:31 2015 +0100
----------------------------------------------------------------------
.../component/aws/ddbstream/ShardList.java | 67 ++++++++++++++++++++
.../ShardListAfterSequenceParametrised.java | 57 +++++++++++++++++
.../ShardListAtSequenceParametrised.java | 57 +++++++++++++++++
.../component/aws/ddbstream/ShardListTest.java | 24 ++++++-
4 files changed, 204 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index a0df179..3ae1c4e 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -21,6 +21,11 @@ import java.util.HashMap;
import java.util.Map;
import com.amazonaws.services.dynamodbv2.model.Shard;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +75,35 @@ class ShardList {
throw new IllegalStateException("Unable to find a shard with no children " + shards);
}
+ Shard afterSeq(String sequenceNumber) {
+ return atAfterSeq(sequenceNumber, After.INSTANCE);
+ }
+
+ Shard atSeq(String sequenceNumber) {
+ return atAfterSeq(sequenceNumber, At.INSTANCE);
+ }
+
+ Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) {
+ BigInteger atAfter = new BigInteger(sequenceNumber);
+ List<Shard> sorted = new ArrayList<>();
+ sorted.addAll(shards.values());
+ Collections.sort(sorted, StartingSequenceNumberComparator.INSTANCE);
+ for (Shard shard : sorted) {
+ if (shard.getSequenceNumberRange().getEndingSequenceNumber() != null) {
+ BigInteger end = new BigInteger(shard.getSequenceNumberRange().getEndingSequenceNumber());
+ // essentially: after < end or after <= end
+ if (condition.matches(atAfter, end)) {
+ return shard;
+ }
+
+ }
+ }
+ if (shards.size() > 0) {
+ return sorted.get(sorted.size()-1);
+ }
+ throw new IllegalStateException("Unable to find a shard with appropriate sequence numbers for " + sequenceNumber + " in " + shards);
+ }
+
/**
* Removes shards that are older than the provided shard.
* Does not remove the provided shard.
@@ -95,4 +129,37 @@ class ShardList {
public String toString() {
return "ShardList{" + "shards=" + shards + '}';
}
+
+ private interface AtAfterCondition {
+ boolean matches(BigInteger sequenceNumber, BigInteger option);
+ }
+
+ private static enum After implements AtAfterCondition {
+ INSTANCE() {
+ @Override
+ public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
+ return providedSequenceNumber.compareTo(shardSequenceNumber) < 0;
+ }
+ }
+ }
+
+ private static enum At implements AtAfterCondition {
+ INSTANCE() {
+ @Override
+ public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
+ return providedSequenceNumber.compareTo(shardSequenceNumber) <= 0;
+ }
+ }
+ }
+
+ private static enum StartingSequenceNumberComparator implements Comparator<Shard> {
+ INSTANCE() {
+ @Override
+ public int compare(Shard o1, Shard o2) {
+ BigInteger i1 = new BigInteger(o1.getSequenceNumberRange().getStartingSequenceNumber());
+ BigInteger i2 = new BigInteger(o2.getSequenceNumberRange().getStartingSequenceNumber());
+ return i1.compareTo(i2);
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
new file mode 100644
index 0000000..c33ebe2
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
@@ -0,0 +1,57 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ShardListAfterSequenceParametrised {
+
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> paramaters() {
+ List<Object[]> results = new ArrayList<>();
+ results.add(new Object[]{"0", "a"});
+ results.add(new Object[]{"3", "a"});
+ results.add(new Object[]{"6", "b"});
+ results.add(new Object[]{"8", "b"});
+ results.add(new Object[]{"15", "c"});
+ results.add(new Object[]{"16", "d"});
+ results.add(new Object[]{"18", "d"});
+ results.add(new Object[]{"25", "d"});
+ results.add(new Object[]{"30", "d"});
+ return results;
+ }
+
+ private ShardList undertest;
+
+ private final String inputSequenceNumber;
+ private final String expectedShardId;
+
+ public ShardListAfterSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+ this.inputSequenceNumber = inputSequenceNumber;
+ this.expectedShardId = expectedShardId;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ undertest = new ShardList();
+ undertest.addAll(ShardListTest.createShardsWithSequenceNumbers(null,
+ "a", "1", "5",
+ "b", "8", "15",
+ "c", "16", "16",
+ "d", "20", null
+ ));
+ }
+
+ @Test
+ public void assertions() throws Exception {
+ assertThat(undertest.afterSeq(inputSequenceNumber).getShardId(), is(expectedShardId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
new file mode 100644
index 0000000..cf15021
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
@@ -0,0 +1,57 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ShardListAtSequenceParametrised {
+
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> paramaters() {
+ List<Object[]> results = new ArrayList<>();
+ results.add(new Object[]{"0", "a"});
+ results.add(new Object[]{"3", "a"});
+ results.add(new Object[]{"6", "b"});
+ results.add(new Object[]{"8", "b"});
+ results.add(new Object[]{"15", "b"});
+ results.add(new Object[]{"16", "c"});
+ results.add(new Object[]{"18", "d"});
+ results.add(new Object[]{"25", "d"});
+ results.add(new Object[]{"30", "d"});
+ return results;
+ }
+
+ private ShardList undertest;
+
+ private final String inputSequenceNumber;
+ private final String expectedShardId;
+
+ public ShardListAtSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+ this.inputSequenceNumber = inputSequenceNumber;
+ this.expectedShardId = expectedShardId;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ undertest = new ShardList();
+ undertest.addAll(ShardListTest.createShardsWithSequenceNumbers(null,
+ "a", "1", "5",
+ "b", "8", "15",
+ "c", "16", "16",
+ "d", "20", null
+ ));
+ }
+
+ @Test
+ public void assertions() throws Exception {
+ assertThat(undertest.atSeq(inputSequenceNumber).getShardId(), is(expectedShardId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index 1b7249a..e181d7e 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.aws.ddbstream;
+import com.amazonaws.services.dynamodbv2.model.SequenceNumberRange;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.services.dynamodbv2.model.Shard;
@@ -24,6 +25,7 @@ import com.amazonaws.services.dynamodbv2.model.Shard;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import org.junit.Ignore;
public class ShardListTest {
@@ -118,7 +120,27 @@ public class ShardListTest {
assertThat(shards.first().getShardId(), is("c"));
}
- List<Shard> createShards(String initialParent, String... shardIds) {
+ static List<Shard> createShardsWithSequenceNumbers(String initialParent, String... shardIdsAndSeqNos) {
+ String previous = initialParent;
+ List<Shard> result = new ArrayList<>();
+ for (int i = 0; i < shardIdsAndSeqNos.length; i += 3) {
+ String id = shardIdsAndSeqNos[i];
+ String seqStart = shardIdsAndSeqNos[i+1];
+ String seqEnd = shardIdsAndSeqNos[i+2];
+ result.add(new Shard()
+ .withShardId(id)
+ .withParentShardId(previous)
+ .withSequenceNumberRange(new SequenceNumberRange()
+ .withStartingSequenceNumber(seqStart)
+ .withEndingSequenceNumber(seqEnd)
+ )
+ );
+ previous = id;
+ }
+ return result;
+ }
+
+ static List<Shard> createShards(String initialParent, String... shardIds) {
String previous = initialParent;
List<Shard> result = new ArrayList<>();
for (String s : shardIds) {