You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/12 19:00:50 UTC
[9/9] git commit: Fixes issues. Tests passing
Fixes issues. Tests passing
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bef89288
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bef89288
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bef89288
Branch: refs/heads/USERGRID-188
Commit: bef89288f0e540cedf802f3b7a87150cbc30bc0c
Parents: a080f09
Author: Todd Nine <to...@apache.org>
Authored: Tue Aug 12 11:00:17 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Aug 12 11:00:17 2014 -0600
----------------------------------------------------------------------
.../astyanax/MultiKeyColumnNameIterator.java | 56 ++++++++++++--------
.../impl/shard/impl/NodeShardCacheImpl.java | 10 ----
.../shard/impl/ShardEntryGroupIterator.java | 3 ++
.../impl/shard/NodeShardAllocationTest.java | 20 ++++++-
.../shard/impl/ShardEntryGroupIteratorTest.java | 34 ++----------
5 files changed, 58 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bef89288/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
index f8e06c9..c5a8c95 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
@@ -24,11 +24,14 @@ package org.apache.usergrid.persistence.core.astyanax;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.usergrid.persistence.core.rx.OrderedMerge;
@@ -74,7 +77,7 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
Observable<T> merged = OrderedMerge.orderedMerge( comparator, bufferSize, observables ).distinctUntilChanged();
- iterator = new InnerIterator( bufferSize );
+ iterator = new InnerIterator(bufferSize);
merged.subscribe( iterator );
}
@@ -111,20 +114,19 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
*/
private final class InnerIterator<T> extends Subscriber<T> implements Iterator<T> {
+ private CountDownLatch startLatch = new CountDownLatch( 1 );
- // private final Semaphore runSemaphore;
private final LinkedBlockingQueue<T> queue;
- private final CountDownLatch startLatch = new CountDownLatch( 1 );
-
private Throwable error;
private boolean done = false;
+ private T next;
+
- private InnerIterator( int bufferSize ) {
- // runSemaphore = new Semaphore( 0 );
- queue = new LinkedBlockingQueue<>( bufferSize );
+ private InnerIterator( int maxSize ) {
+ queue = new LinkedBlockingQueue<>( maxSize );
}
@@ -132,21 +134,36 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
public boolean hasNext() {
+ //we're done
+ if ( next != null ) {
+ return true;
+ }
+
+
try {
startLatch.await();
}
catch ( InterruptedException e ) {
- throw new RuntimeException( "Exception thrown waiting for subscription to start", e );
+ throw new RuntimeException( "Unable to wait for start of submission" );
}
- //we only return false when we're done and the queue is empty
- return !done || !queue.isEmpty();
+
+ //this is almost a busy wait, and is intentional, if we have nothing to poll, we want to get it as soon
+ //as it's available. We generally only hit this once
+ do {
+ next = queue.poll();
+ }
+ while ( next == null && !done );
+
+
+ return next != null;
}
@Override
public T next() {
+
if ( error != null ) {
throw new RuntimeException( "An error occurred when populating the iterator", error );
}
@@ -156,15 +173,9 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
}
-
- try {
- return queue.take();
- }
- catch ( InterruptedException e ) {
- throw new RuntimeException( "Unable to take from queue" );
- }
-
-
+ T toReturn = next;
+ next = null;
+ return toReturn;
}
@@ -184,21 +195,22 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
@Override
public void onError( final Throwable e ) {
error = e;
+ done = true;
startLatch.countDown();
}
@Override
public void onNext( final T t ) {
- Preconditions.checkArgument(t != null, "t cannot be null");
- //offer the value, but block if we can't add the capacity
+ //may block if we get full, that's expected behavior
try {
queue.put( t );
}
catch ( InterruptedException e ) {
- throw new RuntimeException( "Unable to to put into queue", e );
+ throw new RuntimeException( "Unable to take from queue" );
}
+
startLatch.countDown();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bef89288/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index a4867f9..3d3387e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -294,16 +294,6 @@ public class NodeShardCacheImpl implements NodeShardCache {
}
-// private static class ShardEntriesComparator implements Comparator<Long> {
-//
-// private static final ShardEntriesComparator INSTANCE = new ShardEntriesComparator();
-//
-//
-// @Override
-// public int compare( final Long o1, final Long o2 ) {
-// return Long.compare( o1, o2 ) * -1;
-// }
-// }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bef89288/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index 9b7ae26..c8a884b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -9,6 +9,8 @@ import org.apache.commons.collections4.iterators.PushbackIterator;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import com.google.common.base.Preconditions;
+
/**
* Utility class that will take an iterator of all shards, and combine them into an iterator
@@ -28,6 +30,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
* @param minDelta The minimum delta we allow to consider shards the same group
*/
public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta ) {
+ Preconditions.checkArgument(shardIterator.hasNext(), "Shard iterator must have shards present");
this.sourceIterator = new PushbackIterator( shardIterator );
this.minDelta = minDelta;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bef89288/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 32cbc40..dcbd243 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -28,6 +28,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.apache.cassandra.thrift.Mutation;
+
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -42,6 +44,7 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.common.base.Optional;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import static junit.framework.TestCase.assertTrue;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
@@ -495,7 +498,7 @@ public class NodeShardAllocationTest {
@Test
- public void noShardsReturns() {
+ public void noShardsReturns() throws ConnectionException {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
@@ -527,19 +530,32 @@ public class NodeShardAllocationTest {
* Mock up returning an empty iterator, our audit shouldn't create a new shard
*/
when( edgeShardSerialization
- .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+ .getShardMetaData( same( scope ), same( nodeId ), same( NodeType.TARGET ), any( Optional.class ),
same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+
+
+ ArgumentCaptor<Shard> shardArgumentCaptor = ArgumentCaptor.forClass( Shard.class );
+
+ when(edgeShardSerialization.writeShardMeta( same(scope), same(nodeId), same(NodeType.TARGET), shardArgumentCaptor.capture() , same(type), same(subType) )).thenReturn( batch );
+
+
final Iterator<ShardEntryGroup> result =
approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
+
ShardEntryGroup shardEntryGroup = result.next();
final Shard rootShard = new Shard( 0, 0, true );
assertEquals("Shard size expected", 1, shardEntryGroup.entrySize());
+
+ //ensure we persisted the new shard.
+ assertEquals("Root shard was persisted", rootShard, shardArgumentCaptor.getValue());
+
+
//now verify all 4 are in this group. This is because the first shard (0,0) (n-1_ may be the only shard other
//nodes see while we're rolling our state. This means it should be read and merged from as well
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bef89288/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
index ecacd8e..5b20647 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
@@ -39,41 +39,13 @@ import static org.junit.Assert.assertNotNull;
public class ShardEntryGroupIteratorTest {
- @Test
+ @Test(expected = IllegalArgumentException.class)
public void noShards(){
-
- final Shard minShard = new Shard(0, 0, true);
final long delta = 10000;
final Iterator<Shard> noShards = Collections.<Shard>emptyList().iterator();
- ShardEntryGroupIterator entryGroupIterator = new ShardEntryGroupIterator(noShards, delta);
-
- assertTrue("Root shard always present", entryGroupIterator.hasNext());
-
- ShardEntryGroup group = entryGroupIterator.next();
-
- assertNotNull("Group returned", group);
-
- Collection<Shard> readShards = group.getReadShards();
-
- assertEquals("Min shard present", 1, readShards.size());
-
- assertTrue("Min shard present", readShards.contains( minShard ));
-
-
- Collection<Shard> writeShards = group.getWriteShards( 0 );
-
- assertEquals("Min shard present", 1, writeShards.size());
-
- assertTrue("Min shard present", writeShards.contains( minShard ));
-
-
- writeShards = group.getWriteShards( Long.MAX_VALUE );
-
- assertEquals("Min shard present", 1, writeShards.size());
-
- assertTrue("Min shard present", writeShards.contains( minShard ));
-
+ //should blow up, our iterator is empty
+ new ShardEntryGroupIterator(noShards, delta);
}