You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/24 09:46:05 UTC

[2/6] camel git commit: Add {at, after}-sequence-number symantics to the shard list container

Add {at,after}-sequence-number symantics to the shard list container


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e2b9d91c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e2b9d91c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e2b9d91c

Branch: refs/heads/master
Commit: e2b9d91c3428e659c36799466de7abe709c4d941
Parents: 0350423
Author: Candle <ca...@candle.me.uk>
Authored: Wed Dec 16 14:58:02 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:31 2015 +0100

----------------------------------------------------------------------
 .../component/aws/ddbstream/ShardList.java      | 67 ++++++++++++++++++++
 .../ShardListAfterSequenceParametrised.java     | 57 +++++++++++++++++
 .../ShardListAtSequenceParametrised.java        | 57 +++++++++++++++++
 .../component/aws/ddbstream/ShardListTest.java  | 24 ++++++-
 4 files changed, 204 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index a0df179..3ae1c4e 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -21,6 +21,11 @@ import java.util.HashMap;
 import java.util.Map;
 
 import com.amazonaws.services.dynamodbv2.model.Shard;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,6 +75,35 @@ class ShardList {
         throw new IllegalStateException("Unable to find a shard with no children " + shards);
     }
 
+    Shard afterSeq(String sequenceNumber) {
+        return atAfterSeq(sequenceNumber, After.INSTANCE);
+    }
+
+    Shard atSeq(String sequenceNumber) {
+        return atAfterSeq(sequenceNumber, At.INSTANCE);
+    }
+
+    Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) {
+        BigInteger atAfter = new BigInteger(sequenceNumber);
+        List<Shard> sorted = new ArrayList<>();
+        sorted.addAll(shards.values());
+        Collections.sort(sorted, StartingSequenceNumberComparator.INSTANCE);
+        for (Shard shard : sorted) {
+            if (shard.getSequenceNumberRange().getEndingSequenceNumber() != null) {
+                BigInteger end = new BigInteger(shard.getSequenceNumberRange().getEndingSequenceNumber());
+                // essentially: after < end or after <= end
+                if (condition.matches(atAfter, end)) {
+                    return shard;
+                }
+
+            }
+        }
+        if (shards.size() > 0) {
+            return sorted.get(sorted.size()-1);
+        }
+        throw new IllegalStateException("Unable to find a shard with appropriate sequence numbers for " + sequenceNumber + " in " + shards);
+    }
+
     /**
      * Removes shards that are older than the provided shard.
      * Does not remove the provided shard.
@@ -95,4 +129,37 @@ class ShardList {
     public String toString() {
         return "ShardList{" + "shards=" + shards + '}';
     }
+
+    private interface AtAfterCondition {
+        boolean matches(BigInteger sequenceNumber, BigInteger option);
+    }
+
+    private static enum After implements AtAfterCondition {
+        INSTANCE() {
+            @Override
+            public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
+                return providedSequenceNumber.compareTo(shardSequenceNumber) < 0;
+            }
+        }
+    }
+
+    private static enum At implements AtAfterCondition {
+        INSTANCE() {
+            @Override
+            public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
+                return providedSequenceNumber.compareTo(shardSequenceNumber) <= 0;
+            }
+        }
+    }
+
+    private static enum StartingSequenceNumberComparator implements Comparator<Shard> {
+        INSTANCE() {
+            @Override
+            public int compare(Shard o1, Shard o2) {
+                BigInteger i1 = new BigInteger(o1.getSequenceNumberRange().getStartingSequenceNumber());
+                BigInteger i2 = new BigInteger(o2.getSequenceNumberRange().getStartingSequenceNumber());
+                return i1.compareTo(i2);
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
new file mode 100644
index 0000000..c33ebe2
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
@@ -0,0 +1,57 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ShardListAfterSequenceParametrised {
+
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> paramaters() {
+        List<Object[]> results = new ArrayList<>();
+        results.add(new Object[]{"0", "a"});
+        results.add(new Object[]{"3", "a"});
+        results.add(new Object[]{"6", "b"});
+        results.add(new Object[]{"8", "b"});
+        results.add(new Object[]{"15", "c"});
+        results.add(new Object[]{"16", "d"});
+        results.add(new Object[]{"18", "d"});
+        results.add(new Object[]{"25", "d"});
+        results.add(new Object[]{"30", "d"});
+        return results;
+    }
+
+    private ShardList undertest;
+
+    private final String inputSequenceNumber;
+    private final String expectedShardId;
+
+    public ShardListAfterSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+        this.inputSequenceNumber = inputSequenceNumber;
+        this.expectedShardId = expectedShardId;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        undertest = new ShardList();
+        undertest.addAll(ShardListTest.createShardsWithSequenceNumbers(null,
+                "a", "1", "5",
+                "b", "8", "15",
+                "c", "16", "16",
+                "d", "20", null
+        ));
+    }
+
+    @Test
+    public void assertions() throws Exception {
+        assertThat(undertest.afterSeq(inputSequenceNumber).getShardId(), is(expectedShardId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
new file mode 100644
index 0000000..cf15021
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
@@ -0,0 +1,57 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ShardListAtSequenceParametrised {
+
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> paramaters() {
+        List<Object[]> results = new ArrayList<>();
+        results.add(new Object[]{"0", "a"});
+        results.add(new Object[]{"3", "a"});
+        results.add(new Object[]{"6", "b"});
+        results.add(new Object[]{"8", "b"});
+        results.add(new Object[]{"15", "b"});
+        results.add(new Object[]{"16", "c"});
+        results.add(new Object[]{"18", "d"});
+        results.add(new Object[]{"25", "d"});
+        results.add(new Object[]{"30", "d"});
+        return results;
+    }
+
+    private ShardList undertest;
+
+    private final String inputSequenceNumber;
+    private final String expectedShardId;
+
+    public ShardListAtSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+        this.inputSequenceNumber = inputSequenceNumber;
+        this.expectedShardId = expectedShardId;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        undertest = new ShardList();
+        undertest.addAll(ShardListTest.createShardsWithSequenceNumbers(null,
+                "a", "1", "5",
+                "b", "8", "15",
+                "c", "16", "16",
+                "d", "20", null
+        ));
+    }
+
+    @Test
+    public void assertions() throws Exception {
+        assertThat(undertest.atSeq(inputSequenceNumber).getShardId(), is(expectedShardId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index 1b7249a..e181d7e 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws.ddbstream;
 
 
+import com.amazonaws.services.dynamodbv2.model.SequenceNumberRange;
 import java.util.ArrayList;
 import java.util.List;
 import com.amazonaws.services.dynamodbv2.model.Shard;
@@ -24,6 +25,7 @@ import com.amazonaws.services.dynamodbv2.model.Shard;
 import org.junit.Test;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import org.junit.Ignore;
 
 public class ShardListTest {
 
@@ -118,7 +120,27 @@ public class ShardListTest {
         assertThat(shards.first().getShardId(), is("c"));
     }
 
-    List<Shard> createShards(String initialParent, String... shardIds) {
+    static List<Shard> createShardsWithSequenceNumbers(String initialParent, String... shardIdsAndSeqNos) {
+        String previous = initialParent;
+        List<Shard> result = new ArrayList<>();
+        for (int i = 0; i < shardIdsAndSeqNos.length; i += 3) {
+            String id = shardIdsAndSeqNos[i];
+            String seqStart = shardIdsAndSeqNos[i+1];
+            String seqEnd = shardIdsAndSeqNos[i+2];
+            result.add(new Shard()
+                    .withShardId(id)
+                    .withParentShardId(previous)
+                    .withSequenceNumberRange(new SequenceNumberRange()
+                        .withStartingSequenceNumber(seqStart)
+                        .withEndingSequenceNumber(seqEnd)
+                    )
+            );
+            previous = id;
+        }
+        return result;
+    }
+
+    static List<Shard> createShards(String initialParent, String... shardIds) {
         String previous = initialParent;
         List<Shard> result = new ArrayList<>();
         for (String s : shardIds) {