You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/12 19:00:47 UTC

[6/9] git commit: Adding utilities. Need to fix bad multiget semantics for a merged iterator

Adding utilities.  Need to fix bad multiget semantics for a merged iterator


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aafb63c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aafb63c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aafb63c8

Branch: refs/heads/USERGRID-188
Commit: aafb63c83f8d540b40412a4e25efbd5131cc9dda
Parents: 8786460
Author: Todd Nine <to...@apache.org>
Authored: Thu Aug 7 16:35:07 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Aug 7 16:35:07 2014 -0600

----------------------------------------------------------------------
 .../astyanax/MultiKeyColumnNameIterator.java    | 152 +++++++++++++++++++
 .../impl/shard/NodeShardCache.java              |   3 +-
 .../impl/shard/impl/EdgeSearcher.java           |  29 +++-
 .../shard/impl/NodeShardAllocationImpl.java     |  18 ---
 .../impl/shard/impl/NodeShardCacheImpl.java     |   1 -
 .../impl/shard/impl/ShardRowIterator.java       |  18 ++-
 6 files changed, 189 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
new file mode 100644
index 0000000..dede4ab
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
@@ -0,0 +1,152 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.core.astyanax;
+
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Observable;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.query.RowSliceQuery;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+
+
+/**
+ * Simple iterator that wraps a collection of ColumnNameIterators.  We do this because we can't page with a multiRangeScan
+ * correctly for multiple trips.  As a result, we do this since only 1 iterator with minimum values could potentially
+ * feed the entire result set.
+ *
+ * Compares the parsed values and puts them in order. If more than one row key emits the same value
+ * the first value is selected, and ignored from subsequent iterators.
+ *
+ */
+public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
+
+
+    private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "CassRead" );
+
+
+   private final Comparator<T> comparator;
+
+
+    public MultiKeyColumnNameIterator(Collection<ColumnNameIterator<C, T>> columnNameIterators, final Comparator<T> comparator) {
+
+        this.comparator = comparator;
+
+
+        advanceIterator();
+    }
+
+
+    @Override
+    public Iterator<T> iterator() {
+        return this;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        //if we've exhausted this iterator, try to advance to the next set
+        if ( sourceIterator.hasNext() ) {
+            return true;
+        }
+
+        //advance the iterator, to the next page, there could be more
+        advanceIterator();
+
+        return sourceIterator.hasNext();
+    }
+
+
+    @Override
+    public T next() {
+
+        if ( !hasNext() ) {
+            throw new NoSuchElementException();
+        }
+
+        return parser.parseColumn( sourceIterator.next() );
+    }
+
+
+    @Override
+    public void remove() {
+        sourceIterator.remove();
+    }
+
+
+    /**
+     * Execute the query again and set the reuslts
+     */
+    private void advanceIterator() {
+
+        //run producing the values within a hystrix command.  This way we'll time out if the read takes too long
+        try {
+            sourceIterator = rowQuery.execute().getResult().iterator();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to get next page", e );
+        }
+    }
+
+
+    private final class MultiColumIteratorSet{
+
+        private TreeMap<T, ColumnNameIterator<C, T>> sources;
+
+
+
+
+        public boolean isInitialized(){
+            return sources != null;
+        }
+
+
+
+        public boolean hasNext(){
+            return sources == null && sources.size() > 0;
+        }
+
+
+        public void initialize(Collection<ColumnNameIterator<C, T>> columnNameIterators){
+
+
+
+            for (ColumnNameIterator)
+
+        }
+
+
+
+
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index b39e7e5..561706a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -44,7 +44,8 @@ public interface NodeShardCache {
                                 final String... edgeType );
 
     /**
-     * Get an iterator of all versions <= the version for iterating shard entry sets
+     * Get an iterator of all versions <= the version for iterating shard entry sets.  The iterator of groups will be ordered
+     * highest to lowest.  I.E range scanning from Long.MAX_VALUE to 0
      * @param scope
      * @param nodeId
      * @para nodeType

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 8b39115..b9b2477 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -1,12 +1,17 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 
 import com.google.common.base.Optional;
@@ -23,7 +28,8 @@ import com.netflix.astyanax.util.RangeBuilder;
  * @param <C> The column type
  * @param <T> The parsed return type
  */
-public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Iterator<ScopedRowKey<ApplicationScope, R>> {
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T>,
+        Iterator<List<ScopedRowKey<ApplicationScope, R>>> {
 
     protected final Optional<Edge> last;
     protected final long maxTimestamp;
@@ -47,12 +53,21 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Itera
 
 
     @Override
-    public ScopedRowKey<ApplicationScope, R> next() {
-        /**
-         * TODO Shard fix this
-         */
-        return ScopedRowKey
-                .fromKey( scope, generateRowKey( shards.next().getCompactionTarget().getShardIndex() ) );
+    public List<ScopedRowKey<ApplicationScope, R>> next() {
+        Collection<Shard> readShards = shards.next().getReadShards();
+
+        List<ScopedRowKey<ApplicationScope, R>> rowKeys = new ArrayList<>(readShards.size());
+
+        for(Shard shard : shards.next().getReadShards()){
+
+            final ScopedRowKey<ApplicationScope, R> rowKey = ScopedRowKey
+                    .fromKey( scope, generateRowKey(shard.getShardIndex() ) );
+
+            rowKeys.add( rowKey );
+        }
+
+
+        return rowKeys;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 286ed67..4052d8f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -251,22 +251,4 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         return timeService.getCurrentTime() - minDelta;
     }
-
-
-    /**
-     * Sorts by minimum time first.  If 2 times are equal, the min shard value is taken
-     */
-    private static final class MinShardTimeComparator implements Comparator<Shard> {
-
-        @Override
-        public int compare( final Shard s1, final Shard s2 ) {
-            int result = Long.compare( s1.getCreatedTime(), s2.getCreatedTime() );
-
-            if ( result == 0 ) {
-                result = Long.compare( s1.getShardIndex(), s2.getShardIndex() );
-            }
-
-            return result;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 9fcd7d6..a4867f9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -23,7 +23,6 @@ import java.beans.PropertyChangeEvent;
 import java.beans.PropertyChangeListener;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
index 1da85e1..d33f235 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
@@ -2,9 +2,11 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
 
 import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -12,6 +14,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.model.ConsistencyLevel;
 import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.query.RowSliceQuery;
 import com.netflix.astyanax.util.RangeBuilder;
 
 
@@ -99,14 +102,19 @@ public class ShardRowIterator<R, C, T> implements Iterator<T> {
         //set the range into the search
         searcher.setRange( rangeBuilder );
 
-        final ScopedRowKey<ApplicationScope, R> rowKey = searcher.next();
+        /**
+         * Get our list of slices
+         */
+        final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.next();
+
+
+        RowSliceQuery<ScopedRowKey<ApplicationScope, R>, C> query =
+                keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
+                        .withColumnRange( rangeBuilder.build() );
 
 
-        RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
-                keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
-                        .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
+        currentColumnIterator = new MultiKeyColumnNameIterator<>(query, searcher, searcher, searcher.hasNext() );
 
 
-        currentColumnIterator = new ColumnNameIterator<C, T>( query, searcher, searcher.hasPage() );
     }
 }