You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/12 19:00:50 UTC

[9/9] git commit: Fixes issues. Tests passing

Fixes issues.  Tests passing


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bef89288
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bef89288
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bef89288

Branch: refs/heads/USERGRID-188
Commit: bef89288f0e540cedf802f3b7a87150cbc30bc0c
Parents: a080f09
Author: Todd Nine <to...@apache.org>
Authored: Tue Aug 12 11:00:17 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Aug 12 11:00:17 2014 -0600

----------------------------------------------------------------------
 .../astyanax/MultiKeyColumnNameIterator.java    | 56 ++++++++++++--------
 .../impl/shard/impl/NodeShardCacheImpl.java     | 10 ----
 .../shard/impl/ShardEntryGroupIterator.java     |  3 ++
 .../impl/shard/NodeShardAllocationTest.java     | 20 ++++++-
 .../shard/impl/ShardEntryGroupIteratorTest.java | 34 ++----------
 5 files changed, 58 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bef89288/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
index f8e06c9..c5a8c95 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
@@ -24,11 +24,14 @@ package org.apache.usergrid.persistence.core.astyanax;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Exchanger;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.usergrid.persistence.core.rx.OrderedMerge;
 
@@ -74,7 +77,7 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
         Observable<T> merged = OrderedMerge.orderedMerge( comparator, bufferSize, observables ).distinctUntilChanged();
 
 
-        iterator = new InnerIterator( bufferSize );
+        iterator = new InnerIterator(bufferSize);
 
         merged.subscribe( iterator );
     }
@@ -111,20 +114,19 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
      */
     private final class InnerIterator<T> extends Subscriber<T> implements Iterator<T> {
 
+        private CountDownLatch startLatch = new CountDownLatch( 1 );
 
-        //        private final Semaphore runSemaphore;
         private final LinkedBlockingQueue<T> queue;
 
-        private final CountDownLatch startLatch = new CountDownLatch( 1 );
-
 
         private Throwable error;
         private boolean done = false;
 
+        private T next;
+
 
-        private InnerIterator( int bufferSize ) {
-            //            runSemaphore = new Semaphore( 0 );
-            queue = new LinkedBlockingQueue<>( bufferSize );
+        private InnerIterator( int maxSize ) {
+            queue = new LinkedBlockingQueue<>( maxSize );
         }
 
 
@@ -132,21 +134,36 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
         public boolean hasNext() {
 
 
+            //we're done
+            if ( next != null ) {
+                return true;
+            }
+
+
             try {
                 startLatch.await();
             }
             catch ( InterruptedException e ) {
-                throw new RuntimeException( "Exception thrown waiting for subscription to start", e );
+                throw new RuntimeException( "Unable to wait for start of submission" );
             }
 
 
-            //we only return false when we're done and the queue is empty
-            return !done || !queue.isEmpty();
+
+            //this is almost a busy wait, and is intentional, if we have nothing to poll, we want to get it as soon
+            //as it's available.  We generally only hit this once
+            do {
+                next = queue.poll();
+            }
+            while ( next == null && !done );
+
+
+            return next != null;
         }
 
 
         @Override
         public T next() {
+
             if ( error != null ) {
                 throw new RuntimeException( "An error occurred when populating the iterator", error );
             }
@@ -156,15 +173,9 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
             }
 
 
-
-            try {
-                return  queue.take();
-            }
-            catch ( InterruptedException e ) {
-                throw new RuntimeException( "Unable to take from queue" );
-            }
-
-
+            T toReturn = next;
+            next = null;
+            return toReturn;
         }
 
 
@@ -184,21 +195,22 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
         @Override
         public void onError( final Throwable e ) {
             error = e;
+            done = true;
             startLatch.countDown();
         }
 
 
         @Override
         public void onNext( final T t ) {
-            Preconditions.checkArgument(t != null, "t cannot be null");
 
-            //offer the value, but block if we can't add the capacity
+            //may block if we get full, that's expected behavior
             try {
                 queue.put( t );
             }
             catch ( InterruptedException e ) {
-                throw new RuntimeException( "Unable to to put into queue", e );
+                throw new RuntimeException( "Unable to take from queue" );
             }
+
             startLatch.countDown();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bef89288/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index a4867f9..3d3387e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -294,16 +294,6 @@ public class NodeShardCacheImpl implements NodeShardCache {
         }
 
 
-//        private static class ShardEntriesComparator implements Comparator<Long> {
-//
-//            private static final ShardEntriesComparator INSTANCE = new ShardEntriesComparator();
-//
-//
-//            @Override
-//            public int compare( final Long o1, final Long o2 ) {
-//                return Long.compare( o1, o2 ) * -1;
-//            }
-//        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bef89288/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index 9b7ae26..c8a884b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -9,6 +9,8 @@ import org.apache.commons.collections4.iterators.PushbackIterator;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 
+import com.google.common.base.Preconditions;
+
 
 /**
  * Utility class that will take an iterator of all shards, and combine them into an iterator
@@ -28,6 +30,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
      * @param minDelta The minimum delta we allow to consider shards the same group
      */
     public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta ) {
+        Preconditions.checkArgument(shardIterator.hasNext(), "Shard iterator must have shards present");
         this.sourceIterator = new PushbackIterator( shardIterator );
         this.minDelta = minDelta;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bef89288/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 32cbc40..dcbd243 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -28,6 +28,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import org.apache.cassandra.thrift.Mutation;
+
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -42,6 +44,7 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import com.google.common.base.Optional;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
@@ -495,7 +498,7 @@ public class NodeShardAllocationTest {
 
 
     @Test
-    public void noShardsReturns() {
+    public void noShardsReturns() throws ConnectionException {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
         final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
@@ -527,19 +530,32 @@ public class NodeShardAllocationTest {
          * Mock up returning an empty iterator, our audit shouldn't create a new shard
          */
         when( edgeShardSerialization
-                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+                .getShardMetaData( same( scope ), same( nodeId ), same( NodeType.TARGET ), any( Optional.class ),
                         same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
 
+
+
+        ArgumentCaptor<Shard> shardArgumentCaptor = ArgumentCaptor.forClass( Shard.class );
+
+        when(edgeShardSerialization.writeShardMeta( same(scope), same(nodeId), same(NodeType.TARGET), shardArgumentCaptor.capture() , same(type), same(subType) )).thenReturn( batch );
+
+
         final Iterator<ShardEntryGroup> result =
                 approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
 
 
+
         ShardEntryGroup shardEntryGroup = result.next();
 
         final Shard rootShard = new Shard( 0, 0, true );
 
         assertEquals("Shard size expected", 1, shardEntryGroup.entrySize());
 
+
+        //ensure we persisted the new shard.
+        assertEquals("Root shard was persisted", rootShard, shardArgumentCaptor.getValue());
+
+
         //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
         //nodes see while we're rolling our state.  This means it should be read and merged from as well
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bef89288/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
index ecacd8e..5b20647 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
@@ -39,41 +39,13 @@ import static org.junit.Assert.assertNotNull;
 
 public class ShardEntryGroupIteratorTest {
 
-    @Test
+    @Test(expected = IllegalArgumentException.class)
     public void noShards(){
-
-        final Shard minShard = new Shard(0, 0, true);
         final long delta = 10000;
         final Iterator<Shard> noShards = Collections.<Shard>emptyList().iterator();
 
-        ShardEntryGroupIterator entryGroupIterator = new ShardEntryGroupIterator(noShards, delta);
-
-        assertTrue("Root shard always present", entryGroupIterator.hasNext());
-
-        ShardEntryGroup group = entryGroupIterator.next();
-
-        assertNotNull("Group returned", group);
-
-        Collection<Shard> readShards = group.getReadShards();
-
-        assertEquals("Min shard present", 1, readShards.size());
-
-        assertTrue("Min shard present", readShards.contains( minShard ));
-
-
-        Collection<Shard> writeShards = group.getWriteShards( 0 );
-
-        assertEquals("Min shard present", 1, writeShards.size());
-
-        assertTrue("Min shard present", writeShards.contains( minShard ));
-
-
-        writeShards = group.getWriteShards( Long.MAX_VALUE );
-
-        assertEquals("Min shard present", 1, writeShards.size());
-
-        assertTrue("Min shard present", writeShards.contains( minShard ));
-
+        //should blow up, our iterator is empty
+        new ShardEntryGroupIterator(noShards, delta);
 
     }